Merge branch 'master' into EMQX-871-872

This commit is contained in:
x1001100011 2021-09-12 15:21:10 -07:00
commit 7a7cccb337
46 changed files with 1074 additions and 502 deletions

View File

@ -2,10 +2,10 @@ name: API Test Suite
on:
push:
tags:
- e*
- v*
pull_request:
tags:
- e*
- v*
pull_request:
jobs:
build:
@ -36,6 +36,9 @@ jobs:
script_name:
- api_metrics
- api_subscriptions
- api_clients
- api_routes
- api_publish
steps:
- uses: actions/checkout@v2
with:

View File

@ -5,7 +5,7 @@ BUILD = $(CURDIR)/build
SCRIPTS = $(CURDIR)/scripts
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
export EMQX_DESC ?= EMQ X
export EMQX_DASHBOARD_VERSION ?= v5.0.0-beta.11
export EMQX_DASHBOARD_VERSION ?= v5.0.0-beta.13
ifeq ($(OS),Windows_NT)
export REBAR_COLOR=none
endif

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "5.0-alpha.5"}).
-define(EMQX_RELEASE, {opensource, "5.0-alpha.6"}).
-else.

View File

@ -15,7 +15,7 @@
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.15.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.17.0"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}

View File

@ -78,6 +78,25 @@
-define(VER_1, <<"1">>).
-define(VER_2, <<"2">>).
-type chain_name() :: atom().
-type authenticator_id() :: binary().
-type position() :: top | bottom | {before, authenticator_id()}.
-type update_request() :: {create_authenticator, chain_name(), map()}
| {delete_authenticator, chain_name(), authenticator_id()}
| {update_authenticator, chain_name(), authenticator_id(), map()}
| {move_authenticator, chain_name(), authenticator_id(), position()}.
-type authn_type() :: atom() | {atom(), atom()}.
-type provider() :: module().
-type chain() :: #{name := chain_name(),
authenticators := [authenticator()]}.
-type authenticator() :: #{id := authenticator_id(),
provider := provider(),
enable := boolean(),
state := map()}.
-type config() :: #{atom() => term()}.
-type state() :: #{atom() => term()}.
-type extra() :: #{is_superuser := boolean(),
@ -128,7 +147,12 @@
-callback update_user(UserID, UserInfo, State)
-> {ok, User}
| {error, term()}
when UserID::binary, UserInfo::map(), State::state(), User::user_info().
when UserID::binary(), UserInfo::map(), State::state(), User::user_info().
-callback lookup_user(UserID, UserInfo, State)
-> {ok, User}
| {error, term()}
when UserID::binary(), UserInfo::map(), State::state(), User::user_info().
-callback list_users(State)
-> {ok, Users}
@ -138,6 +162,7 @@
, add_user/2
, delete_user/2
, update_user/3
, lookup_user/3
, list_users/1
]).
@ -159,6 +184,8 @@ authentication(_) -> undefined.
%% Callbacks of config handler
%%------------------------------------------------------------------------------
-spec pre_config_update(update_request(), emqx_config:raw_config())
-> {ok, map() | list()} | {error, term()}.
pre_config_update(UpdateReq, OldConfig) ->
case do_pre_config_update(UpdateReq, to_list(OldConfig)) of
{error, Reason} -> {error, Reason};
@ -185,22 +212,22 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}
{error, Reason} -> {error, Reason};
{ok, Part1, [Found | Part2]} ->
case Position of
<<"top">> ->
top ->
{ok, [Found | Part1] ++ Part2};
<<"bottom">> ->
bottom ->
{ok, Part1 ++ Part2 ++ [Found]};
<<"before:", Before/binary>> ->
{before, Before} ->
case split_by_id(Before, Part1 ++ Part2) of
{error, Reason} ->
{error, Reason};
{ok, NPart1, [NFound | NPart2]} ->
{ok, NPart1 ++ [Found, NFound | NPart2]}
end;
_ ->
{error, {invalid_parameter, position}}
end
end
end.
-spec post_config_update(update_request(), map() | list(), emqx_config:raw_config(), emqx_config:app_envs())
-> ok | {ok, map()} | {error, term()}.
post_config_update(UpdateReq, NewConfig, OldConfig, AppEnvs) ->
do_post_config_update(UpdateReq, check_config(to_list(NewConfig)), OldConfig, AppEnvs).
@ -220,13 +247,7 @@ do_post_config_update({update_authenticator, ChainName, AuthenticatorID, _Config
update_authenticator(ChainName, AuthenticatorID, NConfig);
do_post_config_update({move_authenticator, ChainName, AuthenticatorID, Position}, _NewConfig, _OldConfig, _AppEnvs) ->
NPosition = case Position of
<<"top">> -> top;
<<"bottom">> -> bottom;
<<"before:", Before/binary>> ->
{before, Before}
end,
move_authenticator(ChainName, AuthenticatorID, NPosition).
move_authenticator(ChainName, AuthenticatorID, Position).
check_config(Config) ->
#{authentication := CheckedConfig} = hocon_schema:check_plain(emqx_authentication,
@ -269,6 +290,7 @@ do_authenticate([#authenticator{provider = Provider, state = State} | More], Cre
%% APIs
%%------------------------------------------------------------------------------
-spec initialize_authentication(chain_name(), [#{binary() => term()}]) -> ok.
initialize_authentication(_, []) ->
ok;
initialize_authentication(ChainName, AuthenticatorsConfig) ->
@ -283,43 +305,56 @@ initialize_authentication(ChainName, AuthenticatorsConfig) ->
end
end, CheckedConfig).
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec stop() -> ok.
stop() ->
gen_server:stop(?MODULE).
-spec get_refs() -> {ok, Refs} when Refs :: [{authn_type(), module()}].
get_refs() ->
gen_server:call(?MODULE, get_refs).
-spec add_provider(authn_type(), module()) -> ok.
add_provider(AuthNType, Provider) ->
gen_server:call(?MODULE, {add_provider, AuthNType, Provider}).
-spec remove_provider(authn_type()) -> ok.
remove_provider(AuthNType) ->
gen_server:call(?MODULE, {remove_provider, AuthNType}).
-spec create_chain(chain_name()) -> {ok, chain()} | {error, term()}.
create_chain(Name) ->
gen_server:call(?MODULE, {create_chain, Name}).
-spec delete_chain(chain_name()) -> ok | {error, term()}.
delete_chain(Name) ->
gen_server:call(?MODULE, {delete_chain, Name}).
-spec lookup_chain(chain_name()) -> {ok, chain()} | {error, term()}.
lookup_chain(Name) ->
gen_server:call(?MODULE, {lookup_chain, Name}).
-spec list_chains() -> {ok, [chain()]}.
list_chains() ->
Chains = ets:tab2list(?CHAINS_TAB),
{ok, [serialize_chain(Chain) || Chain <- Chains]}.
-spec create_authenticator(chain_name(), config()) -> {ok, authenticator()} | {error, term()}.
create_authenticator(ChainName, Config) ->
gen_server:call(?MODULE, {create_authenticator, ChainName, Config}).
-spec delete_authenticator(chain_name(), authenticator_id()) -> ok | {error, term()}.
delete_authenticator(ChainName, AuthenticatorID) ->
gen_server:call(?MODULE, {delete_authenticator, ChainName, AuthenticatorID}).
-spec update_authenticator(chain_name(), authenticator_id(), config()) -> {ok, authenticator()} | {error, term()}.
update_authenticator(ChainName, AuthenticatorID, Config) ->
gen_server:call(?MODULE, {update_authenticator, ChainName, AuthenticatorID, Config}).
-spec lookup_authenticator(chain_name(), authenticator_id()) -> {ok, authenticator()} | {error, term()}.
lookup_authenticator(ChainName, AuthenticatorID) ->
case ets:lookup(?CHAINS_TAB, ChainName) of
[] ->
@ -333,6 +368,7 @@ lookup_authenticator(ChainName, AuthenticatorID) ->
end
end.
-spec list_authenticators(chain_name()) -> {ok, [authenticator()]} | {error, term()}.
list_authenticators(ChainName) ->
case ets:lookup(?CHAINS_TAB, ChainName) of
[] ->
@ -341,28 +377,36 @@ list_authenticators(ChainName) ->
{ok, serialize_authenticators(Authenticators)}
end.
-spec move_authenticator(chain_name(), authenticator_id(), position()) -> ok | {error, term()}.
move_authenticator(ChainName, AuthenticatorID, Position) ->
gen_server:call(?MODULE, {move_authenticator, ChainName, AuthenticatorID, Position}).
-spec import_users(chain_name(), authenticator_id(), binary()) -> ok | {error, term()}.
import_users(ChainName, AuthenticatorID, Filename) ->
gen_server:call(?MODULE, {import_users, ChainName, AuthenticatorID, Filename}).
-spec add_user(chain_name(), authenticator_id(), user_info()) -> {ok, user_info()} | {error, term()}.
add_user(ChainName, AuthenticatorID, UserInfo) ->
gen_server:call(?MODULE, {add_user, ChainName, AuthenticatorID, UserInfo}).
-spec delete_user(chain_name(), authenticator_id(), binary()) -> ok | {error, term()}.
delete_user(ChainName, AuthenticatorID, UserID) ->
gen_server:call(?MODULE, {delete_user, ChainName, AuthenticatorID, UserID}).
-spec update_user(chain_name(), authenticator_id(), binary(), map()) -> {ok, user_info()} | {error, term()}.
update_user(ChainName, AuthenticatorID, UserID, NewUserInfo) ->
gen_server:call(?MODULE, {update_user, ChainName, AuthenticatorID, UserID, NewUserInfo}).
-spec lookup_user(chain_name(), authenticator_id(), binary()) -> {ok, user_info()} | {error, term()}.
lookup_user(ChainName, AuthenticatorID, UserID) ->
gen_server:call(?MODULE, {lookup_user, ChainName, AuthenticatorID, UserID}).
%% TODO: Support pagination
-spec list_users(chain_name(), authenticator_id()) -> {ok, [user_info()]} | {error, term()}.
list_users(ChainName, AuthenticatorID) ->
gen_server:call(?MODULE, {list_users, ChainName, AuthenticatorID}).
-spec generate_id(config()) -> authenticator_id().
generate_id(#{mechanism := Mechanism0, backend := Backend0}) ->
Mechanism = atom_to_binary(Mechanism0),
Backend = atom_to_binary(Backend0),
@ -484,7 +528,7 @@ handle_call({update_authenticator, ChainName, AuthenticatorID, Config}, _From, S
{error, Reason}
end;
false ->
{error, mechanism_or_backend_change_is_not_alloed}
{error, change_of_authentication_type_is_not_allowed}
end
end
end,
@ -679,7 +723,7 @@ call_authenticator(ChainName, AuthenticatorID, Func, Args) ->
true ->
erlang:apply(Provider, Func, Args ++ [State]);
false ->
{error, unsupported_feature}
{error, unsupported_operation}
end
end
end,

View File

@ -253,7 +253,7 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
end.
delete_authentication(Type, ListenerName, _Conf) ->
emqx_authentication:delete_chain(atom_to_binary(listener_id(Type, ListenerName))).
emqx_authentication:delete_chain(listener_id(Type, ListenerName)).
%% Update the listeners at runtime
post_config_update(_Req, NewListeners, OldListeners, _AppEnvs) ->

View File

@ -69,33 +69,82 @@
cipher/0,
comma_separated_atoms/0]).
-export([namespace/0, roots/0, fields/1]).
-export([namespace/0, roots/0, roots/1, fields/1]).
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
-export([ssl/1]).
namespace() -> undefined.
roots() ->
["zones",
"mqtt",
"flapping_detect",
"force_shutdown",
"force_gc",
"conn_congestion",
"rate_limit",
"quota",
{"listeners",
%% TODO change config importance to a field metadata
roots(high) ++ roots(medium) ++ roots(low).
roots(high) ->
[ {"listeners",
sc(ref("listeners"),
#{ desc => "MQTT listeners identified by their protocol type and assigned names. "
"The listeners enabled by default are named with 'default'"})
},
"broker",
"plugins",
"stats",
"sysmon",
"alarm",
"authorization",
{"authentication", sc(hoconsc:lazy(hoconsc:array(map())), #{})}
#{ desc => "MQTT listeners identified by their protocol type and assigned names"
})
}
, {"zones",
sc(map("name", ref("zone")),
#{ desc => "A zone is a set of configs grouped by the zone <code>name</code>. <br>"
"For flexible configuration mapping, the <code>name</code> "
"can be set to a listener's <code>zone</code> config.<br>"
"NOTE: A builtin zone named <code>default</code> is auto created "
"and can not be deleted."
})}
, {"mqtt",
sc(ref("mqtt"),
#{ desc => "Global MQTT configuration.<br>"
"The configs here work as default values which can be overriden "
"in <code>zone</code> configs"
})}
, {"authentication",
sc(hoconsc:lazy(hoconsc:array(map())),
#{ desc => "Default authentication configs for all MQTT listeners.<br>"
"For per-listener overrides see <code>authentication</code> "
"in listener configs"
})}
, {"authorization",
sc(ref("authorization"),
#{})}
];
roots(medium) ->
[ {"broker",
sc(ref("broker"),
#{})}
, {"rate_limit",
sc(ref("rate_limit"),
#{})}
, {"force_shutdown",
sc(ref("force_shutdown"),
#{})}
];
roots(low) ->
[ {"force_gc",
sc(ref("force_gc"),
#{})}
, {"conn_congestion",
sc(ref("conn_congestion"),
#{})}
, {"quota",
sc(ref("quota"),
#{})}
, {"plugins", %% TODO: move to emqx_machine_schema
sc(ref("plugins"),
#{})}
, {"stats",
sc(ref("stats"),
#{})}
, {"sysmon",
sc(ref("sysmon"),
#{})}
, {"alarm",
sc(ref("alarm"),
#{})}
, {"flapping_detect",
sc(ref("flapping_detect"),
#{})}
].
fields("stats") ->
@ -117,8 +166,7 @@ fields("authorization") ->
, {"cache",
sc(ref(?MODULE, "cache"),
#{
})
}
})}
];
fields("cache") ->
@ -270,14 +318,7 @@ fields("mqtt") ->
})}
];
fields("zones") ->
[ {"$name",
sc(ref("zone_settings"),
#{
}
)}];
fields("zone_settings") ->
fields("zone") ->
Fields = ["mqtt", "stats", "flapping_detect", "force_shutdown",
"conn_congestion", "rate_limit", "quota", "force_gc"],
[{F, ref(emqx_zone_schema, F)} || F <- Fields];
@ -375,48 +416,37 @@ fields("force_gc") ->
fields("listeners") ->
[ {"tcp",
sc(ref("tcp_listeners"),
sc(map(name, ref("mqtt_tcp_listener")),
#{ desc => "TCP listeners"
, nullable => {true, recursive}
})
}
, {"ssl",
sc(ref("ssl_listeners"),
sc(map(name, ref("mqtt_ssl_listener")),
#{ desc => "SSL listeners"
, nullable => {true, recursive}
})
}
, {"ws",
sc(ref("ws_listeners"),
sc(map(name, ref("mqtt_ws_listener")),
#{ desc => "HTTP websocket listeners"
, nullable => {true, recursive}
})
}
, {"wss",
sc(ref("wss_listeners"),
sc(map(name, ref("mqtt_wss_listener")),
#{ desc => "HTTPS websocket listeners"
, nullable => {true, recursive}
})
}
, {"quic",
sc(ref("quic_listeners"),
sc(map(name, ref("mqtt_quic_listener")),
#{ desc => "QUIC listeners"
, nullable => {true, recursive}
})
}
];
fields("tcp_listeners") ->
[ {"$name", ref("mqtt_tcp_listener")}
];
fields("ssl_listeners") ->
[ {"$name", ref("mqtt_ssl_listener")}
];
fields("ws_listeners") ->
[ {"$name", ref("mqtt_ws_listener")}
];
fields("wss_listeners") ->
[ {"$name", ref("mqtt_wss_listener")}
];
fields("quic_listeners") ->
[ {"$name", ref("mqtt_quic_listener")}
];
fields("mqtt_tcp_listener") ->
[ {"tcp",
sc(ref("tcp_opts"),
@ -1011,6 +1041,8 @@ ceiling(X) ->
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
map(Name, Type) -> hoconsc:map(Name, Type).
ref(Field) -> hoconsc:ref(?MODULE, Field).
ref(Module, Field) -> hoconsc:ref(Module, Field).

View File

@ -206,7 +206,7 @@ t_update_config(_) ->
?assertMatch({ok, _}, update_config([authentication], {update_authenticator, Global, ID1, #{}})),
?assertMatch({ok, #{id := ID1, state := #{mark := 2}}}, ?AUTHN:lookup_authenticator(Global, ID1)),
?assertMatch({ok, _}, update_config([authentication], {move_authenticator, Global, ID2, <<"top">>})),
?assertMatch({ok, _}, update_config([authentication], {move_authenticator, Global, ID2, top})),
?assertMatch({ok, [#{id := ID2}, #{id := ID1}]}, ?AUTHN:list_authenticators(Global)),
?assertMatch({ok, _}, update_config([authentication], {delete_authenticator, Global, ID1})),
@ -223,7 +223,7 @@ t_update_config(_) ->
?assertMatch({ok, _}, update_config(ConfKeyPath, {update_authenticator, ListenerID, ID1, #{}})),
?assertMatch({ok, #{id := ID1, state := #{mark := 2}}}, ?AUTHN:lookup_authenticator(ListenerID, ID1)),
?assertMatch({ok, _}, update_config(ConfKeyPath, {move_authenticator, ListenerID, ID2, <<"top">>})),
?assertMatch({ok, _}, update_config(ConfKeyPath, {move_authenticator, ListenerID, ID2, top})),
?assertMatch({ok, [#{id := ID2}, #{id := ID1}]}, ?AUTHN:list_authenticators(ListenerID)),
?assertMatch({ok, _}, update_config(ConfKeyPath, {delete_authenticator, ListenerID, ID1})),

View File

@ -37,7 +37,7 @@
-define(EXAMPLE_1, #{mechanism => <<"password-based">>,
backend => <<"built-in-database">>,
query => <<"SELECT password_hash from built-in-database WHERE username = ${username}">>,
user_id_type => <<"username">>,
password_hash_algorithm => #{
name => <<"sha256">>
}}).
@ -1193,10 +1193,10 @@ definitions() ->
enum => [<<"built-in-database">>],
example => <<"built-in-database">>
},
query => #{
user_id_type => #{
type => string,
default => <<"SELECT password_hash from built-in-database WHERE username = ${username}">>,
example => <<"SELECT password_hash from built-in-database WHERE username = ${username}">>
enum => [<<"username">>, <<"clientid">>],
example => <<"username">>
},
password_hash_algorithm => minirest:ref(<<"PasswordHashAlgorithm">>)
}
@ -1550,7 +1550,8 @@ definitions() ->
enable_pipelining => #{
type => boolean,
default => true
}
},
ssl => minirest:ref(<<"SSL">>)
}
},
@ -1584,6 +1585,10 @@ definitions() ->
certificate => #{
type => string
},
endpoint => #{
type => string,
example => <<"http://localhost:80">>
},
verify_claims => #{
type => object,
additionalProperties => #{
@ -1631,6 +1636,7 @@ definitions() ->
},
salt_rounds => #{
type => integer,
description => <<"Only valid when the name field is set to bcrypt">>,
default => 10
}
}
@ -1857,8 +1863,7 @@ list_authenticator(ConfKeyPath, AuthenticatorID) ->
update_authenticator(ConfKeyPath, ChainName0, AuthenticatorID, Config) ->
ChainName = to_atom(ChainName0),
case update_config(ConfKeyPath,
{update_authenticator, ChainName, AuthenticatorID, Config}) of
case update_config(ConfKeyPath, {update_authenticator, ChainName, AuthenticatorID, Config}) of
{ok, #{post_config_update := #{?AUTHN := #{id := ID}},
raw_config := AuthenticatorsConfig}} ->
{ok, AuthenticatorConfig} = find_config(ID, AuthenticatorsConfig),
@ -1878,10 +1883,15 @@ delete_authenticator(ConfKeyPath, ChainName0, AuthenticatorID) ->
move_authenitcator(ConfKeyPath, ChainName0, AuthenticatorID, Position) ->
ChainName = to_atom(ChainName0),
case update_config(ConfKeyPath, {move_authenticator, ChainName, AuthenticatorID, Position}) of
{ok, _} ->
{204};
{error, {_, _, Reason}} ->
case parse_position(Position) of
{ok, NPosition} ->
case update_config(ConfKeyPath, {move_authenticator, ChainName, AuthenticatorID, NPosition}) of
{ok, _} ->
{204};
{error, {_, _, Reason}} ->
serialize_error(Reason)
end;
{error, Reason} ->
serialize_error(Reason)
end.
@ -1963,29 +1973,69 @@ fill_defaults(Config) ->
serialize_error({not_found, {authenticator, ID}}) ->
{404, #{code => <<"NOT_FOUND">>,
message => list_to_binary(io_lib:format("Authenticator '~s' does not exist", [ID]))}};
message => list_to_binary(
io_lib:format("Authenticator '~s' does not exist", [ID])
)}};
serialize_error({not_found, {listener, ID}}) ->
{404, #{code => <<"NOT_FOUND">>,
message => list_to_binary(io_lib:format("Listener '~s' does not exist", [ID]))}};
message => list_to_binary(
io_lib:format("Listener '~s' does not exist", [ID])
)}};
serialize_error({not_found, {chain, ?GLOBAL}}) ->
{500, #{code => <<"INTERNAL_SERVER_ERROR">>,
message => <<"Authentication status is abnormal">>}};
serialize_error({not_found, {chain, Name}}) ->
{400, #{code => <<"BAD_REQUEST">>,
message => list_to_binary(
io_lib:format("No authentication has been create for listener '~s'", [Name])
)}};
serialize_error({already_exists, {authenticator, ID}}) ->
{409, #{code => <<"ALREADY_EXISTS">>,
message => list_to_binary(
io_lib:format("Authenticator '~s' already exist", [ID])
)}};
serialize_error(no_available_provider) ->
{400, #{code => <<"BAD_REQUEST">>,
message => <<"Unsupported authentication type">>}};
serialize_error(change_of_authentication_type_is_not_allowed) ->
{400, #{code => <<"BAD_REQUEST">>,
message => <<"Change of authentication type is not allowed">>}};
serialize_error(unsupported_operation) ->
{400, #{code => <<"BAD_REQUEST">>,
message => <<"Operation not supported in this authentication type">>}};
serialize_error({missing_parameter, Name}) ->
{400, #{code => <<"MISSING_PARAMETER">>,
message => list_to_binary(
io_lib:format("The input parameter '~p' that is mandatory for processing this request is not supplied", [Name])
)}};
serialize_error({invalid_parameter, Name}) ->
{400, #{code => <<"INVALID_PARAMETER">>,
message => list_to_binary(
io_lib:format("The value of input parameter '~p' is invalid", [Name])
)}};
serialize_error(Reason) ->
{400, #{code => <<"BAD_REQUEST">>,
message => list_to_binary(io_lib:format("~p", [Reason]))}}.
parse_position(<<"top">>) ->
{ok, top};
parse_position(<<"bottom">>) ->
{ok, bottom};
parse_position(<<"before:", Before/binary>>) ->
{ok, {before, Before}};
parse_position(_) ->
{error, {invalid_parameter, position}}.
to_list(M) when is_map(M) ->
[M];
to_list(L) when is_list(L) ->

View File

@ -214,7 +214,7 @@ default_headers_no_content_type() ->
transform_header_name(Headers) ->
maps:fold(fun(K0, V, Acc) ->
K = list_to_binary(string:to_lower(binary_to_list(K0))),
K = list_to_binary(string:to_lower(to_list(K0))),
maps:put(K, V, Acc)
end, #{}, Headers).
@ -301,3 +301,8 @@ parse_body(<<"application/x-www-form-urlencoded">>, Body) ->
{ok, maps:from_list(cow_qs:parse_qs(Body))};
parse_body(ContentType, _) ->
{error, {unsupported_content_type, ContentType}}.
to_list(A) when is_atom(A) ->
atom_to_list(A);
to_list(B) when is_binary(B) ->
binary_to_list(B).

View File

@ -22,7 +22,7 @@ authorization {
# certfile: "{{ platform_etc_dir }}/certs/client-cert.pem"
# keyfile: "{{ platform_etc_dir }}/certs/client-key.pem"
# }
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or clientid = '%c'"
# query: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or clientid = '%c'"
# },
# {
# type: pgsql
@ -33,7 +33,7 @@ authorization {
# password: public
# auto_reconnect: true
# ssl: {enable: false}
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
# query: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
# },
# {
# type: redis
@ -53,7 +53,7 @@ authorization {
# database: mqtt
# ssl: {enable: false}
# collection: mqtt_authz
# find: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] }
# selector: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] }
# },
{
type: file

View File

@ -290,7 +290,7 @@ init_source(#{enable := true,
end;
init_source(#{enable := true,
type := DB,
sql := SQL
query := SQL
} = Source) when DB =:= mysql;
DB =:= pgsql ->
Mod = authz_module(DB),
@ -298,7 +298,7 @@ init_source(#{enable := true,
{error, Reason} -> error({load_config_error, Reason});
Id -> Source#{annotations =>
#{id => Id,
sql => Mod:parse_query(SQL)
query => Mod:parse_query(SQL)
}
}
end;

View File

@ -95,7 +95,7 @@ definitions() ->
},
method => #{
type => string,
enum => [<<"get">>, <<"post">>, <<"put">>],
enum => [<<"get">>, <<"post">>],
example => <<"get">>
},
headers => #{type => object},
@ -118,7 +118,7 @@ definitions() ->
required => [ type
, enable
, collection
, find
, selector
, mongo_type
, server
, pool_size
@ -140,7 +140,7 @@ definitions() ->
example => true
},
collection => #{type => string},
find => #{type => object},
selector => #{type => object},
mongo_type => #{type => string,
enum => [<<"single">>],
example => <<"single">>},
@ -173,7 +173,7 @@ definitions() ->
required => [ type
, enable
, collection
, find
, selector
, mongo_type
, servers
, replica_set_name
@ -196,7 +196,7 @@ definitions() ->
example => true
},
collection => #{type => string},
find => #{type => object},
selector => #{type => object},
mongo_type => #{type => string,
enum => [<<"rs">>],
example => <<"rs">>},
@ -231,7 +231,7 @@ definitions() ->
required => [ type
, enable
, collection
, find
, selector
, mongo_type
, servers
, pool_size
@ -253,7 +253,7 @@ definitions() ->
example => true
},
collection => #{type => string},
find => #{type => object},
selector => #{type => object},
mongo_type => #{type => string,
enum => [<<"sharded">>],
example => <<"sharded">>},
@ -286,7 +286,7 @@ definitions() ->
type => object,
required => [ type
, enable
, sql
, query
, server
, database
, pool_size
@ -305,7 +305,7 @@ definitions() ->
type => boolean,
example => true
},
sql => #{type => string},
query => #{type => string},
server => #{type => string,
example => <<"127.0.0.1:3306">>
},
@ -323,7 +323,7 @@ definitions() ->
type => object,
required => [ type
, enable
, sql
, query
, server
, database
, pool_size
@ -342,7 +342,7 @@ definitions() ->
type => boolean,
example => true
},
sql => #{type => string},
query => #{type => string},
server => #{type => string,
example => <<"127.0.0.1:5432">>
},
@ -484,7 +484,7 @@ definitions() ->
type => array,
items => #{
type => string,
example => <<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.">>
example => <<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
}
},
path => #{

View File

@ -32,9 +32,8 @@
-define(EXAMPLE_FILE,
#{type=> file,
enable => true,
rules => [<<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.">>,
<<"{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
]}).
rules => <<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
}).
-define(EXAMPLE_RETURNED_REDIS,
maps:put(annotations, #{status => healthy}, ?EXAMPLE_REDIS)
@ -350,9 +349,7 @@ sources(put, #{body := Body}) when is_list(Body) ->
NBody = [ begin
case Source of
#{<<"type">> := <<"file">>, <<"rules">> := Rules, <<"enable">> := Enable} ->
{ok, Filename} = write_file(filename:join([emqx:get_config([node, data_dir]), "acl.conf"]),
erlang:list_to_bitstring([<<Rule/binary, "\n">> || Rule <- Rules])
),
{ok, Filename} = write_file(filename:join([emqx:get_config([node, data_dir]), "acl.conf"]), Rules),
#{type => file, enable => Enable, path => Filename};
_ -> write_cert(Source)
end
@ -396,9 +393,7 @@ source(get, #{bindings := #{type := Type}}) ->
{200, read_cert(NSource2)}
end;
source(put, #{bindings := #{type := <<"file">>}, body := #{<<"type">> := <<"file">>, <<"rules">> := Rules, <<"enable">> := Enable}}) ->
{ok, Filename} = write_file(maps:get(path, emqx_authz:lookup(file), ""),
erlang:list_to_bitstring([<<Rule/binary, "\n">> || Rule <- Rules])
),
{ok, Filename} = write_file(maps:get(path, emqx_authz:lookup(file), ""), Rules),
case emqx_authz:update({replace_once, file}, #{type => file, enable => Enable, path => Filename}) of
{ok, _} -> {204};
{error, Reason} ->
@ -457,21 +452,21 @@ write_cert(#{<<"ssl">> := #{<<"enable">> := true} = SSL} = Source) ->
CertPath = filename:join([emqx:get_config([node, data_dir]), "certs"]),
CaCert = case maps:is_key(<<"cacertfile">>, SSL) of
true ->
{ok, CaCertFile} = write_file(filename:join([CertPath, "cacert-" ++ emqx_rule_id:gen() ++".pem"]),
{ok, CaCertFile} = write_file(filename:join([CertPath, "cacert-" ++ emqx_plugin_libs_id:gen() ++".pem"]),
maps:get(<<"cacertfile">>, SSL)),
CaCertFile;
false -> ""
end,
Cert = case maps:is_key(<<"certfile">>, SSL) of
true ->
{ok, CertFile} = write_file(filename:join([CertPath, "cert-" ++ emqx_rule_id:gen() ++".pem"]),
{ok, CertFile} = write_file(filename:join([CertPath, "cert-" ++ emqx_plugin_libs_id:gen() ++".pem"]),
maps:get(<<"certfile">>, SSL)),
CertFile;
false -> ""
end,
Key = case maps:is_key(<<"keyfile">>, SSL) of
true ->
{ok, KeyFile} = write_file(filename:join([CertPath, "key-" ++ emqx_rule_id:gen() ++".pem"]),
{ok, KeyFile} = write_file(filename:join([CertPath, "key-" ++ emqx_plugin_libs_id:gen() ++".pem"]),
maps:get(<<"keyfile">>, SSL)),
KeyFile;
false -> ""

View File

@ -35,10 +35,10 @@ description() ->
authorize(Client, PubSub, Topic,
#{collection := Collection,
find := Find,
selector := Selector,
annotations := #{id := ResourceID}
}) ->
case emqx_resource:query(ResourceID, {find, Collection, replvar(Find, Client), #{}}) of
case emqx_resource:query(ResourceID, {find, Collection, replvar(Selector, Client), #{}}) of
{error, Reason} ->
?LOG(error, "[AuthZ] Query mongo error: ~p", [Reason]),
nomatch;
@ -57,7 +57,7 @@ do_authorize(Client, PubSub, Topic, [Rule | Tail]) ->
nomatch -> do_authorize(Client, PubSub, Topic, Tail)
end.
replvar(Find, #{clientid := Clientid,
replvar(Selector, #{clientid := Clientid,
username := Username,
peerhost := IpAddress
}) ->
@ -76,7 +76,7 @@ replvar(Find, #{clientid := Clientid,
maps:put(K, V3, AccIn);
_Fun(K, V, AccIn) -> maps:put(K, V, AccIn)
end,
maps:fold(Fun, #{}, Find).
maps:fold(Fun, #{}, Selector).
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(B) when is_binary(B) -> B;

View File

@ -47,10 +47,10 @@ parse_query(Sql) ->
authorize(Client, PubSub, Topic,
#{annotations := #{id := ResourceID,
sql := {SQL, Params}
query := {Query, Params}
}
}) ->
case emqx_resource:query(ResourceID, {sql, SQL, replvar(Params, Client)}) of
case emqx_resource:query(ResourceID, {sql, Query, replvar(Params, Client)}) of
{ok, _Columns, []} -> nomatch;
{ok, Columns, Rows} ->
do_authorize(Client, PubSub, Topic, Columns, Rows);

View File

@ -51,10 +51,10 @@ parse_query(Sql) ->
authorize(Client, PubSub, Topic,
#{annotations := #{id := ResourceID,
sql := {SQL, Params}
query := {Query, Params}
}
}) ->
case emqx_resource:query(ResourceID, {sql, SQL, replvar(Params, Client)}) of
case emqx_resource:query(ResourceID, {sql, Query, replvar(Params, Client)}) of
{ok, _Columns, []} -> nomatch;
{ok, Columns, Rows} ->
do_authorize(Client, PubSub, Topic, Columns, Rows);

View File

@ -66,7 +66,7 @@ fields(http_get) ->
},
converter => fun (Headers0) ->
Headers1 = maps:fold(fun(K0, V, AccIn) ->
K1 = iolist_to_binary(string:to_lower(binary_to_list(K0))),
K1 = iolist_to_binary(string:to_lower(to_list(K0))),
maps:put(K1, V, AccIn)
end, #{}, Headers0),
maps:merge(#{ <<"accept">> => <<"application/json">>
@ -84,7 +84,7 @@ fields(http_post) ->
, {enable, #{type => boolean(),
default => true}}
, {url, #{type => url()}}
, {method, #{type => hoconsc:enum([post, put]),
, {method, #{type => post,
default => get}}
, {headers, #{type => map(),
default => #{ <<"accept">> => <<"application/json">>
@ -116,24 +116,24 @@ fields(http_post) ->
fields(mongo_single) ->
connector_fields(mongo, single) ++
[ {collection, #{type => atom()}}
, {find, #{type => map()}}
, {selector, #{type => map()}}
];
fields(mongo_rs) ->
connector_fields(mongo, rs) ++
[ {collection, #{type => atom()}}
, {find, #{type => map()}}
, {selector, #{type => map()}}
];
fields(mongo_sharded) ->
connector_fields(mongo, sharded) ++
[ {collection, #{type => atom()}}
, {find, #{type => map()}}
, {selector, #{type => map()}}
];
fields(mysql) ->
connector_fields(mysql) ++
[ {sql, query()} ];
[ {query, query()} ];
fields(pgsql) ->
connector_fields(pgsql) ++
[ {sql, query()} ];
[ {query, query()} ];
fields(redis_single) ->
connector_fields(redis, single) ++
[ {cmd, query()} ];
@ -177,3 +177,8 @@ connector_fields(DB, Fields) ->
, {enable, #{type => boolean(),
default => true}}
] ++ Mod:fields(Fields).
to_list(A) when is_atom(A) ->
atom_to_list(A);
to_list(B) when is_binary(B) ->
binary_to_list(B).

View File

@ -75,7 +75,7 @@ init_per_testcase(_, Config) ->
<<"database">> => <<"mqtt">>,
<<"ssl">> => #{<<"enable">> => false},
<<"collection">> => <<"fake">>,
<<"find">> => #{<<"a">> => <<"b">>}
<<"selector">> => #{<<"a">> => <<"b">>}
}).
-define(SOURCE3, #{<<"type">> => <<"mysql">>,
<<"enable">> => true,
@ -86,7 +86,7 @@ init_per_testcase(_, Config) ->
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
<<"sql">> => <<"abcb">>
<<"query">> => <<"abcb">>
}).
-define(SOURCE4, #{<<"type">> => <<"pgsql">>,
<<"enable">> => true,
@ -97,7 +97,7 @@ init_per_testcase(_, Config) ->
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
<<"sql">> => <<"abcb">>
<<"query">> => <<"abcb">>
}).
-define(SOURCE5, #{<<"type">> => <<"redis">>,
<<"enable">> => true,

View File

@ -54,7 +54,7 @@
<<"database">> => <<"mqtt">>,
<<"ssl">> => #{<<"enable">> => false},
<<"collection">> => <<"fake">>,
<<"find">> => #{<<"a">> => <<"b">>}
<<"selector">> => #{<<"a">> => <<"b">>}
}).
-define(SOURCE3, #{<<"type">> => <<"mysql">>,
<<"enable">> => true,
@ -65,7 +65,7 @@
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
<<"sql">> => <<"abcb">>
<<"query">> => <<"abcb">>
}).
-define(SOURCE4, #{<<"type">> => <<"pgsql">>,
<<"enable">> => true,
@ -76,7 +76,7 @@
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
<<"sql">> => <<"abcb">>
<<"query">> => <<"abcb">>
}).
-define(SOURCE5, #{<<"type">> => <<"redis">>,
<<"enable">> => true,
@ -92,10 +92,7 @@
}).
-define(SOURCE6, #{<<"type">> => <<"file">>,
<<"enable">> => true,
<<"rules">> =>
[<<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.">>,
<<"{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
]
<<"rules">> => <<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
}).
all() ->
@ -151,8 +148,8 @@ set_special_configs(_App) ->
ok.
init_per_testcase(t_api, Config) ->
meck:new(emqx_rule_id, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_rule_id, gen, fun() -> "fake" end),
meck:new(emqx_plugin_libs_id, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_plugin_libs_id, gen, fun() -> "fake" end),
meck:new(emqx, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx, get_config, fun([node, data_dir]) ->
@ -165,7 +162,7 @@ init_per_testcase(t_api, Config) ->
init_per_testcase(_, Config) -> Config.
end_per_testcase(t_api, _Config) ->
meck:unload(emqx_rule_id),
meck:unload(emqx_plugin_libs_id),
meck:unload(emqx),
ok;
end_per_testcase(_, _Config) -> ok.

View File

@ -53,7 +53,7 @@ init_per_suite(Config) ->
<<"database">> => <<"mqtt">>,
<<"ssl">> => #{<<"enable">> => false},
<<"collection">> => <<"fake">>,
<<"find">> => #{<<"a">> => <<"b">>}
<<"selector">> => #{<<"a">> => <<"b">>}
}],
{ok, _} = emqx_authz:update(replace, Rules),
Config.

View File

@ -55,7 +55,7 @@ init_per_suite(Config) ->
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
<<"sql">> => <<"abcb">>
<<"query">> => <<"abcb">>
}],
{ok, _} = emqx_authz:update(replace, Rules),
Config.

View File

@ -55,7 +55,7 @@ init_per_suite(Config) ->
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
<<"sql">> => <<"abcb">>
<<"query">> => <<"abcb">>
}],
{ok, _} = emqx_authz:update(replace, Rules),
Config.

View File

@ -2,11 +2,9 @@
## EMQ X Bridge
##--------------------------------------------------------------------
#bridges.mqtt.my_mqtt_bridge {
#bridges.mqtt.my_mqtt_bridge_to_aws {
# server = "127.0.0.1:1883"
# proto_ver = "v4"
# ## the clientid will be the concatenation of `clientid_prefix` and ids in `in` and `out`.
# clientid_prefix = "bridge_client:"
# username = "username1"
# password = ""
# clean_start = true
@ -27,8 +25,9 @@
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
# ## we will create one MQTT connection for each element of the `in`
# in: [{
# ## we will create one MQTT connection for each element of the `message_in`
# message_in: [{
# ## the `id` will be used as part of the clientid
# id = "pull_msgs_from_aws"
# subscribe_remote_topic = "aws/#"
# subscribe_qos = 1
@ -37,8 +36,9 @@
# qos = "${qos}"
# retain = "${retain}"
# }]
# ## we will create one MQTT connection for each element of the `out`
# out: [{
# ## we will create one MQTT connection for each element of the `message_out`
# message_out: [{
# ## the `id` will be used as part of the clientid
# id = "push_msgs_to_aws"
# subscribe_local_topic = "emqx/#"
# remote_topic = "from_emqx/${topic}"

View File

@ -58,14 +58,7 @@ fields(config) ->
, {pool_type, fun pool_type/1}
, {pool_size, fun pool_size/1}
, {enable_pipelining, fun enable_pipelining/1}
] ++ emqx_connector_schema_lib:ssl_fields();
fields(ssl_opts) ->
[ {cacertfile, fun cacertfile/1}
, {keyfile, fun keyfile/1}
, {certfile, fun certfile/1}
, {verify, fun verify/1}
].
] ++ emqx_connector_schema_lib:ssl_fields().
validations() ->
[ {check_ssl_opts, fun check_ssl_opts/1} ].
@ -102,23 +95,6 @@ enable_pipelining(type) -> boolean();
enable_pipelining(default) -> true;
enable_pipelining(_) -> undefined.
cacertfile(type) -> string();
cacertfile(nullable) -> true;
cacertfile(_) -> undefined.
keyfile(type) -> string();
keyfile(nullable) -> true;
keyfile(_) -> undefined.
%% TODO: certfile is required
certfile(type) -> string();
certfile(nullable) -> true;
certfile(_) -> undefined.
verify(type) -> boolean();
verify(default) -> false;
verify(_) -> undefined.
%% ===================================================================
on_start(InstId, #{base_url := #{scheme := Scheme,
host := Host,

View File

@ -89,7 +89,8 @@ on_start(InstId, Conf) ->
NamePrefix = binary_to_list(InstId),
BasicConf = basic_config(Conf),
InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, sub_bridges => []}},
InOutConfigs = check_channel_id_dup(maps:get(in, Conf, []) ++ maps:get(out, Conf, [])),
InOutConfigs = check_channel_id_dup(maps:get(message_in, Conf, [])
++ maps:get(message_out, Conf, [])),
lists:foldl(fun
(_InOutConf, {error, Reason}) ->
{error, Reason};
@ -110,7 +111,7 @@ on_stop(InstId, #{}) ->
end.
%% TODO: let the emqx_resource trigger on_query/4 automatically according to the
%% `in` and `out` config
%% `message_in` and `message_out` config
on_query(InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
baisc_conf := BasicConf}) ->
logger:debug("create channel to connector: ~p, conf: ~p", [InstId, Conf]),
@ -136,19 +137,19 @@ check_channel_id_dup(Confs) ->
end, Confs),
Confs.
%% this is an `in` bridge
create_channel(#{subscribe_remote_topic := _, id := BridgeId} = InConf, NamePrefix,
#{clientid_prefix := ClientPrefix} = BasicConf) ->
logger:info("creating 'in' channel for: ~p", [BridgeId]),
create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId),
clientid => clientid(ClientPrefix, BridgeId),
%% this is an `message_in` bridge
create_channel(#{subscribe_remote_topic := _, id := Id} = InConf, NamePrefix, BasicConf) ->
logger:info("creating 'message_in' channel for: ~p", [Id]),
create_sub_bridge(BasicConf#{
name => bridge_name(NamePrefix, Id),
clientid => clientid(Id),
subscriptions => InConf, forwards => undefined});
%% this is an `out` bridge
create_channel(#{subscribe_local_topic := _, id := BridgeId} = OutConf, NamePrefix,
#{clientid_prefix := ClientPrefix} = BasicConf) ->
logger:info("creating 'out' channel for: ~p", [BridgeId]),
create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId),
clientid => clientid(ClientPrefix, BridgeId),
%% this is an `message_out` bridge
create_channel(#{subscribe_local_topic := _, id := Id} = OutConf, NamePrefix, BasicConf) ->
logger:info("creating 'message_out' channel for: ~p", [Id]),
create_sub_bridge(BasicConf#{
name => bridge_name(NamePrefix, Id),
clientid => clientid(Id),
subscriptions => undefined, forwards => OutConf}).
create_sub_bridge(#{name := Name} = Conf) ->
@ -172,7 +173,6 @@ basic_config(#{
reconnect_interval := ReconnIntv,
proto_ver := ProtoVer,
bridge_mode := BridgeMod,
clientid_prefix := ClientIdPrefix,
username := User,
password := Password,
clean_start := CleanStart,
@ -188,7 +188,6 @@ basic_config(#{
reconnect_interval => ReconnIntv,
proto_ver => ProtoVer,
bridge_mode => BridgeMod,
clientid_prefix => ClientIdPrefix,
username => User,
password => Password,
clean_start => CleanStart,
@ -203,8 +202,8 @@ basic_config(#{
bridge_name(Prefix, Id) ->
list_to_atom(str(Prefix) ++ ":" ++ str(Id)).
clientid(Prefix, Id) ->
list_to_binary(str(Prefix) ++ str(Id)).
clientid(Id) ->
list_to_binary(str(Id) ++ ":" ++ emqx_plugin_libs_id:gen(4)).
str(A) when is_atom(A) ->
atom_to_list(A);

View File

@ -31,7 +31,6 @@ fields("config") ->
, {reconnect_interval, hoconsc:mk(emqx_schema:duration_ms(), #{default => "30s"})}
, {proto_ver, fun proto_ver/1}
, {bridge_mode, hoconsc:mk(boolean(), #{default => true})}
, {clientid_prefix, hoconsc:mk(string(), #{default => ""})}
, {username, hoconsc:mk(string())}
, {password, hoconsc:mk(string())}
, {clean_start, hoconsc:mk(boolean(), #{default => true})}
@ -39,17 +38,17 @@ fields("config") ->
, {retry_interval, hoconsc:mk(emqx_schema:duration_ms(), #{default => "30s"})}
, {max_inflight, hoconsc:mk(integer(), #{default => 32})}
, {replayq, hoconsc:mk(hoconsc:ref(?MODULE, "replayq"))}
, {in, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "in")), #{default => []})}
, {out, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "out")), #{default => []})}
, {message_in, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "message_in")), #{default => []})}
, {message_out, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "message_out")), #{default => []})}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("in") ->
fields("message_in") ->
[ {subscribe_remote_topic, #{type => binary(), nullable => false}}
, {local_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
, {subscribe_qos, hoconsc:mk(qos(), #{default => 1})}
] ++ common_inout_confs();
fields("out") ->
fields("message_out") ->
[ {subscribe_local_topic, #{type => binary(), nullable => false}}
, {remote_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
] ++ common_inout_confs();

View File

@ -14,13 +14,12 @@ fields("emqx_data_bridge") ->
[{bridges, #{type => hoconsc:array(hoconsc:union(?BRIDGES)),
default => []}}];
fields(mysql) -> connector_fields(mysql);
fields(pgsql) -> connector_fields(pgsql);
fields(mongo) -> connector_fields(mongo);
fields(redis) -> connector_fields(redis);
fields(ldap) -> connector_fields(ldap).
fields(mysql) -> connector_fields(emqx_connector_mysql, mysql);
fields(pgsql) -> connector_fields(emqx_connector_pgsql, pgsql);
fields(mongo) -> connector_fields(emqx_connector_mongo, mongo);
fields(redis) -> connector_fields(emqx_connector_redis, redis);
fields(ldap) -> connector_fields(emqx_connector_ldap, ldap).
connector_fields(DB) ->
Mod = list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
connector_fields(ConnectModule, DB) ->
[{name, hoconsc:mk(typerefl:binary())},
{type, #{type => DB}}] ++ Mod:roots().
{type, #{type => DB}}] ++ ConnectModule:roots().

View File

@ -20,6 +20,11 @@
-import(emqx_gateway_http,
[ return_http_error/2
, with_gateway/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
]).
%% minirest behaviour callbacks
@ -55,44 +60,34 @@ gateway(get, Request) ->
{200, emqx_gateway_http:gateways(Status)}.
gateway_insta(delete, #{bindings := #{name := Name0}}) ->
Name = binary_to_existing_atom(Name0),
case emqx_gateway:unload(Name) of
ok ->
{204};
{error, not_found} ->
return_http_error(404, <<"Gateway not found">>)
end;
with_gateway(Name0, fun(GwName, _) ->
_ = emqx_gateway:unload(GwName),
{204}
end);
gateway_insta(get, #{bindings := #{name := Name0}}) ->
Name = binary_to_existing_atom(Name0),
case emqx_gateway:lookup(Name) of
#{config := _Config} ->
GwCfs = filled_raw_confs([<<"gateway">>, Name0]),
NGwCfs = GwCfs#{<<"listeners">> =>
emqx_gateway_http:mapping_listener_m2l(
Name0, maps:get(<<"listeners">>, GwCfs, #{})
)
},
{200, NGwCfs};
undefined ->
return_http_error(404, <<"Gateway not found">>)
end;
gateway_insta(put, #{body := RawConfsIn0,
bindings := #{name := Name}
with_gateway(Name0, fun(_, _) ->
GwConf = filled_raw_confs([<<"gateway">>, Name0]),
LisConf = maps:get(<<"listeners">>, GwConf, #{}),
NLisConf = emqx_gateway_http:mapping_listener_m2l(Name0, LisConf),
{200, GwConf#{<<"listeners">> => NLisConf}}
end);
gateway_insta(put, #{body := GwConf0,
bindings := #{name := Name0}
}) ->
RawConfsIn = maps:without([<<"authentication">>,
<<"listeners">>], RawConfsIn0),
%% FIXME: Cluster Consistence ??
case emqx_gateway:update_rawconf(Name, RawConfsIn) of
ok ->
{200};
{error, not_found} ->
return_http_error(404, <<"Gateway not found">>);
{error, Reason} ->
return_http_error(500, Reason)
end.
with_gateway(Name0, fun(_, _) ->
GwConf = maps:without([<<"authentication">>, <<"listeners">>], GwConf0),
case emqx_gateway:update_rawconf(Name0, GwConf) of
ok ->
{200};
{error, not_found} ->
return_http_error(404, "Gateway not found");
{error, Reason} ->
return_http_error(500, Reason)
end
end).
gateway_insta_stats(get, _Req) ->
return_http_error(401, <<"Implement it later (maybe 5.1)">>).
return_http_error(401, "Implement it later (maybe 5.1)").
filled_raw_confs(Path) ->
RawConf = emqx_config:fill_defaults(
@ -131,7 +126,9 @@ swagger("/gateway/:name", get) ->
#{ description => <<"Get the gateway configurations">>
, parameters => params_gateway_name_in_path()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_gateway_conf()
}
};
@ -139,7 +136,9 @@ swagger("/gateway/:name", delete) ->
#{ description => <<"Delete/Unload the gateway">>
, parameters => params_gateway_name_in_path()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
};
@ -148,7 +147,9 @@ swagger("/gateway/:name", put) ->
, parameters => params_gateway_name_in_path()
, requestBody => schema_gateway_conf()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_no_content()
}
};
@ -156,7 +157,9 @@ swagger("/gateway/:name/stats", get) ->
#{ description => <<"Get gateway Statistic">>
, parameters => params_gateway_name_in_path()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_gateway_stats()
}
}.
@ -181,12 +184,6 @@ params_gateway_status_in_qs() ->
%%--------------------------------------------------------------------
%% schemas
schema_not_found() ->
emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>).
schema_no_content() ->
#{description => <<"No Content">>}.
schema_gateway_overview_list() ->
emqx_mgmt_util:array_schema(
#{ type => object

View File

@ -36,6 +36,11 @@
-import(emqx_gateway_http,
[ return_http_error/2
, with_gateway/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
]).
%%--------------------------------------------------------------------
@ -71,102 +76,103 @@ apis() ->
-define(query_fun, {?MODULE, query}).
-define(format_fun, {?MODULE, format_channel_info}).
clients(get, #{ bindings := #{name := GwName0}
clients(get, #{ bindings := #{name := Name0}
, query_string := Qs
}) ->
GwName = binary_to_existing_atom(GwName0),
TabName = emqx_gateway_cm:tabname(info, GwName),
case maps:get(<<"node">>, Qs, undefined) of
undefined ->
Response = emqx_mgmt_api:cluster_query(
Qs, TabName,
?CLIENT_QS_SCHEMA, ?query_fun
),
{200, Response};
Node1 ->
Node = binary_to_atom(Node1, utf8),
ParamsWithoutNode = maps:without([<<"node">>], Qs),
Response = emqx_mgmt_api:node_query(
Node, ParamsWithoutNode,
TabName, ?CLIENT_QS_SCHEMA, ?query_fun
),
{200, Response}
end.
with_gateway(Name0, fun(GwName, _) ->
TabName = emqx_gateway_cm:tabname(info, GwName),
case maps:get(<<"node">>, Qs, undefined) of
undefined ->
Response = emqx_mgmt_api:cluster_query(
Qs, TabName,
?CLIENT_QS_SCHEMA, ?query_fun
),
{200, Response};
Node1 ->
Node = binary_to_atom(Node1, utf8),
ParamsWithoutNode = maps:without([<<"node">>], Qs),
Response = emqx_mgmt_api:node_query(
Node, ParamsWithoutNode,
TabName, ?CLIENT_QS_SCHEMA, ?query_fun
),
{200, Response}
end
end).
clients_insta(get, #{ bindings := #{name := GwName0,
clients_insta(get, #{ bindings := #{name := Name0,
clientid := ClientId0}
}) ->
GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
case emqx_gateway_http:lookup_client(GwName, ClientId,
{?MODULE, format_channel_info}) of
[ClientInfo] ->
{200, ClientInfo};
[ClientInfo|_More] ->
?LOG(warning, "More than one client info was returned on ~s",
[ClientId]),
{200, ClientInfo};
[] ->
return_http_error(404, <<"Gateway or ClientId not found">>)
end;
clients_insta(delete, #{ bindings := #{name := GwName0,
with_gateway(Name0, fun(GwName, _) ->
case emqx_gateway_http:lookup_client(GwName, ClientId,
{?MODULE, format_channel_info}) of
[ClientInfo] ->
{200, ClientInfo};
[ClientInfo|_More] ->
?LOG(warning, "More than one client info was returned on ~s",
[ClientId]),
{200, ClientInfo};
[] ->
return_http_error(404, "Client not found")
end
end);
clients_insta(delete, #{ bindings := #{name := Name0,
clientid := ClientId0}
}) ->
GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
_ = emqx_gateway_http:kickout_client(GwName, ClientId),
{200}.
with_gateway(Name0, fun(GwName, _) ->
_ = emqx_gateway_http:kickout_client(GwName, ClientId),
{200}
end).
%% FIXME:
%% List the subscription without mountpoint, but has SubOpts,
%% for example, share group ...
subscriptions(get, #{ bindings := #{name := GwName0,
subscriptions(get, #{ bindings := #{name := Name0,
clientid := ClientId0}
}) ->
GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
{error, Reason} ->
return_http_error(404, Reason);
{ok, Subs} ->
{200, Subs}
end;
with_gateway(Name0, fun(GwName, _) ->
case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
{error, Reason} ->
return_http_error(500, Reason);
{ok, Subs} ->
{200, Subs}
end
end);
%% Create the subscription without mountpoint
subscriptions(post, #{ bindings := #{name := GwName0,
subscriptions(post, #{ bindings := #{name := Name0,
clientid := ClientId0},
body := Body
}) ->
GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of
{undefined, _} ->
%% FIXME: more reasonable error code??
return_http_error(404, <<"Request paramter missed: topic">>);
{Topic, QoS} ->
case emqx_gateway_http:client_subscribe(GwName, ClientId, Topic, QoS) of
{error, Reason} ->
return_http_error(404, Reason);
ok ->
{200}
end
end;
with_gateway(Name0, fun(GwName, _) ->
case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of
{undefined, _} ->
return_http_error(400, "Miss topic property");
{Topic, QoS} ->
case emqx_gateway_http:client_subscribe(GwName, ClientId, Topic, QoS) of
{error, Reason} ->
return_http_error(404, Reason);
ok ->
{200}
end
end
end);
%% Remove the subscription without mountpoint
subscriptions(delete, #{ bindings := #{name := GwName0,
subscriptions(delete, #{ bindings := #{name := Name0,
clientid := ClientId0,
topic := Topic0
}
}) ->
GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
Topic = emqx_mgmt_util:urldecode(Topic0),
_ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
{200}.
with_gateway(Name0, fun(GwName, _) ->
_ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
{200}
end).
%%--------------------------------------------------------------------
%% Utils
@ -379,7 +385,9 @@ swagger("/gateway/:name/clients", get) ->
#{ description => <<"Get the gateway clients">>
, parameters => params_client_query()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_clients_list()
}
};
@ -387,7 +395,9 @@ swagger("/gateway/:name/clients/:clientid", get) ->
#{ description => <<"Get the gateway client infomation">>
, parameters => params_client_insta()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_client()
}
};
@ -395,7 +405,9 @@ swagger("/gateway/:name/clients/:clientid", delete) ->
#{ description => <<"Kick out the gateway client">>
, parameters => params_client_insta()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
};
@ -403,7 +415,9 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", get) ->
#{ description => <<"Get the gateway client subscriptions">>
, parameters => params_client_insta()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_subscription_list()
}
};
@ -412,7 +426,9 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", post) ->
, parameters => params_client_insta()
, requestBody => schema_subscription()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_no_content()
}
};
@ -420,7 +436,9 @@ swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) ->
#{ description => <<"Unsubscribe the topic for client">>
, parameters => params_topic_name_in_path() ++ params_client_insta()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
}.
@ -483,12 +501,6 @@ queries(Ls) ->
%%--------------------------------------------------------------------
%% schemas
schema_not_found() ->
emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>).
schema_no_content() ->
#{description => <<"No Content">>}.
schema_clients_list() ->
emqx_mgmt_util:page_schema(
#{ type => object

View File

@ -0,0 +1,316 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gateway_api_listeners).
-behaviour(minirest_api).
-import(emqx_gateway_http,
[ return_http_error/2
, with_gateway/2
, checks/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
]).
%% minirest behaviour callbacks
-export([api_spec/0]).
%% http handlers
-export([ listeners/2
, listeners_insta/2
]).
%%--------------------------------------------------------------------
%% minirest behaviour callbacks
%%--------------------------------------------------------------------
api_spec() ->
{metadata(apis()), []}.
apis() ->
[ {"/gateway/:name/listeners", listeners}
, {"/gateway/:name/listeners/:id", listeners_insta}
].
%%--------------------------------------------------------------------
%% http handlers
listeners(get, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(GwName, _) ->
{200, emqx_gateway_http:listeners(GwName)}
end);
listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
with_gateway(Name0, fun(GwName, Gateway) ->
RunningConf = maps:get(config, Gateway),
%% XXX: check params miss? check badly data tpye??
_ = checks([<<"type">>, <<"name">>, <<"bind">>], LConf),
Type = binary_to_existing_atom(maps:get(<<"type">>, LConf)),
LName = binary_to_atom(maps:get(<<"name">>, LConf)),
Path = [listeners, Type, LName],
case emqx_map_lib:deep_get(Path, RunningConf, undefined) of
undefined ->
ListenerId = emqx_gateway_utils:listener_id(
GwName, Type, LName),
case emqx_gateway_http:update_listener(
ListenerId, LConf) of
ok ->
{204};
{error, Reason} ->
return_http_error(500, Reason)
end;
_ ->
return_http_error(400, "Listener name has occupied")
end
end).
listeners_insta(delete, #{bindings := #{name := Name0, id := ListenerId0}}) ->
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
with_gateway(Name0, fun(_GwName, _) ->
case emqx_gateway_http:remove_listener(ListenerId) of
ok -> {204};
{error, not_found} -> {204};
{error, Reason} ->
return_http_error(500, Reason)
end
end);
listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) ->
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
with_gateway(Name0, fun(_GwName, _) ->
case emqx_gateway_http:listener(ListenerId) of
{ok, Listener} ->
{200, Listener};
{error, not_found} ->
return_http_error(404, "Listener not found");
{error, Reason} ->
return_http_error(500, Reason)
end
end);
listeners_insta(put, #{body := LConf,
bindings := #{name := Name0, id := ListenerId0}
}) ->
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
with_gateway(Name0, fun(_GwName, _) ->
case emqx_gateway_http:update_listener(ListenerId, LConf) of
ok ->
{204};
{error, Reason} ->
return_http_error(500, Reason)
end
end).
%%--------------------------------------------------------------------
%% Swagger defines
%%--------------------------------------------------------------------
metadata(APIs) ->
metadata(APIs, []).
metadata([], APIAcc) ->
lists:reverse(APIAcc);
metadata([{Path, Fun}|More], APIAcc) ->
Methods = [get, post, put, delete, patch],
Mds = lists:foldl(fun(M, Acc) ->
try
Acc#{M => swagger(Path, M)}
catch
error : function_clause ->
Acc
end
end, #{}, Methods),
metadata(More, [{Path, Mds, Fun} | APIAcc]).
swagger("/gateway/:name/listeners", get) ->
#{ description => <<"Get the gateway listeners">>
, parameters => params_gateway_name_in_path()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_listener_list()
}
};
swagger("/gateway/:name/listeners", post) ->
#{ description => <<"Create the gateway listener">>
, parameters => params_gateway_name_in_path()
, requestBody => schema_listener()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_listener_list()
}
};
swagger("/gateway/:name/listeners/:id", get) ->
#{ description => <<"Get the gateway listener configurations">>
, parameters => params_gateway_name_in_path()
++ params_listener_id_in_path()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_listener()
}
};
swagger("/gateway/:name/listeners/:id", delete) ->
#{ description => <<"Delete the gateway listener">>
, parameters => params_gateway_name_in_path()
++ params_listener_id_in_path()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
};
swagger("/gateway/:name/listeners/:id", put) ->
#{ description => <<"Update the gateway listener">>
, parameters => params_gateway_name_in_path()
++ params_listener_id_in_path()
, requestBody => schema_listener()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_no_content()
}
}.
%%--------------------------------------------------------------------
%% params defines
params_gateway_name_in_path() ->
[#{ name => name
, in => path
, schema => #{type => string}
, required => true
}].
params_listener_id_in_path() ->
[#{ name => id
, in => path
, schema => #{type => string}
, required => true
}].
%%--------------------------------------------------------------------
%% schemas
schema_listener_list() ->
emqx_mgmt_util:array_schema(
#{ type => object
, properties => properties_listener()
},
<<"Listener list">>
).
schema_listener() ->
emqx_mgmt_util:schema(
#{ type => object
, properties => properties_listener()
}
).
%%--------------------------------------------------------------------
%% properties
properties_listener() ->
emqx_mgmt_util:properties(
raw_properties_common_listener() ++
[ {tcp, object, raw_properties_tcp_opts()}
, {ssl, object, raw_properties_ssl_opts()}
, {udp, object, raw_properties_udp_opts()}
, {dtls, object, raw_properties_dtls_opts()}
]).
raw_properties_tcp_opts() ->
[ {active_n, integer, <<>>}
, {backlog, integer, <<>>}
, {buffer, string, <<>>}
, {recbuf, string, <<>>}
, {sndbuf, string, <<>>}
, {high_watermark, string, <<>>}
, {nodelay, boolean, <<>>}
, {reuseaddr, boolean, <<>>}
, {send_timeout, string, <<>>}
, {send_timeout_close, boolean, <<>>}
].
raw_properties_ssl_opts() ->
[ {cacertfile, string, <<>>}
, {certfile, string, <<>>}
, {keyfile, string, <<>>}
, {verify, string, <<>>}
, {fail_if_no_peer_cert, boolean, <<>>}
, {server_name_indication, boolean, <<>>}
, {depth, integer, <<>>}
, {password, string, <<>>}
, {handshake_timeout, string, <<>>}
, {versions, {array, string}, <<>>}
, {ciphers, {array, string}, <<>>}
, {user_lookup_fun, string, <<>>}
, {reuse_sessions, boolean, <<>>}
, {secure_renegotiate, boolean, <<>>}
, {honor_cipher_order, boolean, <<>>}
, {dhfile, string, <<>>}
].
raw_properties_udp_opts() ->
[ {active_n, integer, <<>>}
, {buffer, string, <<>>}
, {recbuf, string, <<>>}
, {sndbuf, string, <<>>}
, {reuseaddr, boolean, <<>>}
].
raw_properties_dtls_opts() ->
Ls = lists_key_without(
[versions,ciphers,handshake_timeout], 1,
raw_properties_ssl_opts()
),
[ {versions, {array, string}, <<>>}
, {ciphers, {array, string}, <<>>}
| Ls].
lists_key_without([], _N, L) ->
L;
lists_key_without([K|Ks], N, L) ->
lists_key_without(Ks, N, lists:keydelete(K, N, L)).
raw_properties_common_listener() ->
[ {enable, boolean, <<"Whether to enable this listener">>}
, {id, string, <<"Listener Id">>}
, {name, string, <<"Listener name">>}
, {type, string,
<<"Listener type. Enum: tcp, udp, ssl, dtls">>,
[<<"tcp">>, <<"ssl">>, <<"udp">>, <<"dtls">>]}
, {running, boolean, <<"Listener running status">>}
%% FIXME:
, {bind, string, <<"Listener bind address or port">>}
, {acceptors, integer, <<"Listener acceptors number">>}
, {access_rules, {array, string}, <<"Listener Access rules for client">>}
, {max_conn_rate, integer, <<"Max connection rate for the listener">>}
, {max_connections, integer, <<"Max connections for the listener">>}
, {mountpoint, string,
<<"The Mounpoint for clients of the listener. "
"The gateway-level mountpoint configuration can be overloaded "
"when it is not null or empty string">>}
%% FIXME:
, {authentication, string, <<"NOT-SUPPORTED-NOW">>}
].

View File

@ -26,7 +26,9 @@
%% Mgmt APIs - listeners
-export([ listeners/1
, listener/2
, listener/1
, remove_listener/1
, update_listener/2
, mapping_listener_m2l/2
]).
@ -42,6 +44,12 @@
%% Utils for http, swagger, etc.
-export([ return_http_error/2
, with_gateway/2
, checks/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
]).
-type gateway_summary() ::
@ -108,7 +116,7 @@ get_listeners_status(GwName, Config) ->
lists:map(fun({Type, LisName, ListenOn, _, _}) ->
Name0 = emqx_gateway_utils:listener_id(GwName, Type, LisName),
Name = {Name0, ListenOn},
LisO = #{id => Name0, type => Type},
LisO = #{id => Name0, type => Type, name => LisName},
case catch esockd:listener(Name) of
_Pid when is_pid(_Pid) ->
LisO#{running => true};
@ -121,7 +129,8 @@ get_listeners_status(GwName, Config) ->
%% Mgmt APIs - listeners
%%--------------------------------------------------------------------
listeners(GwName) when is_atom (GwName) ->
-spec listeners(atom() | binary()) -> list().
listeners(GwName) when is_atom(GwName) ->
listeners(atom_to_binary(GwName));
listeners(GwName) ->
RawConf = emqx_config:fill_defaults(
@ -131,8 +140,27 @@ listeners(GwName) ->
[<<"gateway">>, GwName, <<"listeners">>], RawConf)),
mapping_listener_m2l(GwName, Listeners).
listener(_GwName, _ListenerId) ->
ok.
-spec listener(binary()) -> {ok, map()} | {error, not_found} | {error, any()}.
listener(ListenerId) ->
{GwName, Type, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
RootConf = emqx_config:fill_defaults(
emqx_config:get_root_raw([<<"gateway">>])),
try
Path = [<<"gateway">>, GwName, <<"listeners">>, Type, LName],
LConf = emqx_map_lib:deep_get(Path, RootConf),
Running = is_running(binary_to_existing_atom(ListenerId), LConf),
{ok, emqx_map_lib:jsonable_map(
LConf#{
id => ListenerId,
type => Type,
name => LName,
running => Running})}
catch
error : {config_not_found, _} ->
{error, not_found};
_Class : Reason ->
{error, Reason}
end.
mapping_listener_m2l(GwName, Listeners0) ->
Listeners = maps:to_list(Listeners0),
@ -146,6 +174,7 @@ listener(GwName, Type, Conf) ->
LConf#{
id => ListenerId,
type => Type,
name => LName,
running => Running
}
end || {LName, LConf} <- Conf, is_map(LConf)].
@ -159,6 +188,28 @@ is_running(ListenerId, #{<<"bind">> := ListenOn0}) ->
false
end.
-spec remove_listener(binary()) -> ok | {error, not_found} | {error, any()}.
remove_listener(ListenerId) ->
{GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
LConf = emqx:get_raw_config(
[<<"gateway">>, GwName, <<"listeners">>, Type]
),
NLConf = maps:remove(Name, LConf),
emqx_gateway:update_rawconf(
GwName,
#{<<"listeners">> => #{Type => NLConf}}
).
-spec update_listener(atom() | binary(), map()) -> ok | {error, any()}.
update_listener(ListenerId, NewConf0) ->
{GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
NewConf = maps:without([<<"id">>, <<"name">>,
<<"type">>, <<"running">>], NewConf0),
emqx_gateway:update_rawconf(
GwName,
#{<<"listeners">> => #{Type => #{Name => NewConf}}
}).
%%--------------------------------------------------------------------
%% Mgmt APIs - clients
%%--------------------------------------------------------------------
@ -248,7 +299,7 @@ with_channel(GwName, ClientId, Fun) ->
%% Utils
%%--------------------------------------------------------------------
-spec return_http_error(integer(), binary()) -> {integer(), binary()}.
-spec return_http_error(integer(), any()) -> {integer(), binary()}.
return_http_error(Code, Msg) ->
{Code, emqx_json:encode(
#{code => codestr(Code),
@ -256,10 +307,61 @@ return_http_error(Code, Msg) ->
})
}.
codestr(404) -> 'RESOURCE_NOT_FOUND';
codestr(400) -> 'BAD_REQUEST';
codestr(401) -> 'NOT_SUPPORTED_NOW';
codestr(404) -> 'RESOURCE_NOT_FOUND';
codestr(500) -> 'UNKNOW_ERROR'.
-spec with_gateway(binary(), function()) -> any().
with_gateway(GwName0, Fun) ->
try
GwName = try
binary_to_existing_atom(GwName0)
catch _ : _ -> error(badname)
end,
case emqx_gateway:lookup(GwName) of
undefined ->
return_http_error(404, "Gateway not load");
Gateway ->
Fun(GwName, Gateway)
end
catch
error : badname ->
return_http_error(404, "Bad gateway name");
error : {miss_param, K} ->
return_http_error(400, [K, " is required"]);
error : {invalid_listener_id, Id} ->
return_http_error(400, ["invalid listener id: ", Id]);
Class : Reason : Stk ->
?LOG(error, "Uncatched error: {~p, ~p}, stacktrace: ~0p",
[Class, Reason, Stk]),
return_http_error(500, {Class, Reason, Stk})
end.
-spec checks(list(), map()) -> ok.
checks([], _) ->
ok;
checks([K|Ks], Map) ->
case maps:is_key(K, Map) of
true -> checks(Ks, Map);
false ->
error({miss_param, K})
end.
%%--------------------------------------------------------------------
%% common schemas
schema_bad_request() ->
emqx_mgmt_util:error_schema(
<<"Some Params missed">>, ['PARAMETER_MISSED']).
schema_internal_error() ->
emqx_mgmt_util:error_schema(
<<"Ineternal Server Error">>, ['INTERNAL_SERVER_ERROR']).
schema_not_found() ->
emqx_mgmt_util:error_schema(<<"Resource Not Found">>).
schema_no_content() ->
#{description => <<"No Content">>}.
%%--------------------------------------------------------------------
%% Internal funcs

View File

@ -50,16 +50,16 @@ namespace() -> gateway.
roots() -> [gateway].
fields(gateway) ->
[{stomp, sc(ref(stomp_structs))},
{mqttsn, sc(ref(mqttsn_structs))},
{coap, sc(ref(coap_structs))},
{lwm2m, sc(ref(lwm2m_structs))},
{exproto, sc(ref(exproto_structs))}
[{stomp, sc(ref(stomp))},
{mqttsn, sc(ref(mqttsn))},
{coap, sc(ref(coap))},
{lwm2m, sc(ref(lwm2m))},
{exproto, sc(ref(exproto))}
];
fields(stomp_structs) ->
fields(stomp) ->
[ {frame, sc(ref(stomp_frame))}
, {listeners, sc(ref(tcp_listener_group))}
, {listeners, sc(ref(tcp_listeners))}
] ++ gateway_common_options();
fields(stomp_frame) ->
@ -68,12 +68,12 @@ fields(stomp_frame) ->
, {max_body_length, sc(integer(), 8192)}
];
fields(mqttsn_structs) ->
fields(mqttsn) ->
[ {gateway_id, sc(integer())}
, {broadcast, sc(boolean())}
, {enable_qos3, sc(boolean())}
, {predefined, hoconsc:array(ref(mqttsn_predefined))}
, {listeners, sc(ref(udp_listener_group))}
, {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options();
fields(mqttsn_predefined) ->
@ -81,34 +81,34 @@ fields(mqttsn_predefined) ->
, {topic, sc(binary())}
];
fields(coap_structs) ->
fields(coap) ->
[ {heartbeat, sc(duration(), <<"30s">>)}
, {connection_required, sc(boolean(), false)}
, {notify_type, sc(union([non, con, qos]), qos)}
, {subscribe_qos, sc(union([qos0, qos1, qos2, coap]), coap)}
, {publish_qos, sc(union([qos0, qos1, qos2, coap]), coap)}
, {listeners, sc(ref(udp_listener_group))}
, {notify_type, sc(hoconsc:union([non, con, qos]), qos)}
, {subscribe_qos, sc(hoconsc:union([qos0, qos1, qos2, coap]), coap)}
, {publish_qos, sc(hoconsc:union([qos0, qos1, qos2, coap]), coap)}
, {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options();
fields(lwm2m_structs) ->
fields(lwm2m) ->
[ {xml_dir, sc(binary())}
, {lifetime_min, sc(duration())}
, {lifetime_max, sc(duration())}
, {qmode_time_windonw, sc(integer())}
, {auto_observe, sc(boolean())}
, {update_msg_publish_condition, sc(union([always, contains_object_list]))}
, {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))}
, {translators, sc(ref(translators))}
, {listeners, sc(ref(udp_listener_group))}
, {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options();
fields(exproto_structs) ->
fields(exproto) ->
[ {server, sc(ref(exproto_grpc_server))}
, {handler, sc(ref(exproto_grpc_handler))}
, {listeners, sc(ref(udp_tcp_listener_group))}
, {listeners, sc(ref(udp_tcp_listeners))}
] ++ gateway_common_options();
fields(exproto_grpc_server) ->
[ {bind, sc(union(ip_port(), integer()))}
[ {bind, sc(hoconsc:union([ip_port(), integer()]))}
%% TODO: ssl options
];
@ -136,62 +136,45 @@ fields(translator) ->
, {qos, sc(range(0, 2))}
];
fields(udp_listener_group) ->
[ {udp, sc(ref(udp_listener))}
, {dtls, sc(ref(dtls_listener))}
fields(udp_listeners) ->
[ {udp, sc(map(name, ref(udp_listener)))}
, {dtls, sc(map(name, ref(dtls_listener)))}
];
fields(tcp_listener_group) ->
[ {tcp, sc(ref(tcp_listener))}
, {ssl, sc(ref(ssl_listener))}
fields(tcp_listeners) ->
[ {tcp, sc(map(name, ref(tcp_listener)))}
, {ssl, sc(map(name, ref(ssl_listener)))}
];
fields(udp_tcp_listener_group) ->
[ {udp, sc(ref(udp_listener))}
, {dtls, sc(ref(dtls_listener))}
, {tcp, sc(ref(tcp_listener))}
, {ssl, sc(ref(ssl_listener))}
fields(udp_tcp_listeners) ->
[ {udp, sc(map(name, ref(udp_listener)))}
, {dtls, sc(map(name, ref(dtls_listener)))}
, {tcp, sc(map(name, ref(tcp_listener)))}
, {ssl, sc(map(name, ref(ssl_listener)))}
];
fields(tcp_listener) ->
[ {"$name", sc(ref(tcp_listener_settings))}];
fields(ssl_listener) ->
[ {"$name", sc(ref(ssl_listener_settings))}];
fields(udp_listener) ->
[ {"$name", sc(ref(udp_listener_settings))}];
fields(dtls_listener) ->
[ {"$name", sc(ref(dtls_listener_settings))}];
fields(tcp_listener_settings) ->
[
%% some special confs for tcp listener
] ++ tcp_opts()
++ proxy_protocol_opts()
++ common_listener_opts();
] ++
tcp_opts() ++
proxy_protocol_opts() ++
common_listener_opts();
fields(ssl_listener_settings) ->
[
%% some special confs for ssl listener
] ++ tcp_opts()
++ ssl_opts()
++ proxy_protocol_opts()
++ common_listener_opts();
fields(ssl_listener) ->
fields(tcp_listener) ++
ssl_opts();
fields(udp_listener_settings) ->
fields(udp_listener) ->
[
%% some special confs for udp listener
] ++ udp_opts()
++ common_listener_opts();
] ++
udp_opts() ++
common_listener_opts();
fields(dtls_listener_settings) ->
[
%% some special confs for dtls listener
] ++ udp_opts()
++ dtls_opts()
++ common_listener_opts();
fields(dtls_listener) ->
fields(udp_listener) ++
dtls_opts();
fields(udp_opts) ->
[ {active_n, sc(integer(), 100)}
@ -218,11 +201,7 @@ fields(dtls_listener_ssl_opts) ->
lists:keyreplace("versions", 1, Base, {"versions", DtlsVers}),
{"ciphers", Ciphers}
)
);
fields(ExtraField) ->
Mod = list_to_atom(ExtraField++"_schema"),
Mod:fields(ExtraField).
).
default_ciphers() ->
["ECDHE-ECDSA-AES256-GCM-SHA384",
@ -286,16 +265,16 @@ common_listener_opts() ->
].
tcp_opts() ->
[{tcp, sc(ref(emqx_schema, "tcp_opts"), #{})}].
[{tcp, sc_meta(ref(emqx_schema, "tcp_opts"), #{})}].
udp_opts() ->
[{udp, sc(ref(udp_opts), #{})}].
[{udp, sc_meta(ref(udp_opts), #{})}].
ssl_opts() ->
[{ssl, sc(ref(emqx_schema, "listener_ssl_opts"), #{})}].
[{ssl, sc_meta(ref(emqx_schema, "listener_ssl_opts"), #{})}].
dtls_opts() ->
[{dtls, sc(ref(dtls_listener_ssl_opts), #{})}].
[{dtls, sc_meta(ref(dtls_listener_ssl_opts), #{})}].
proxy_protocol_opts() ->
[ {proxy_protocol, sc(boolean())}
@ -308,18 +287,20 @@ default_dtls_vsns() ->
dtls_vsn(<<"dtlsv1.2">>) -> 'dtlsv1.2';
dtls_vsn(<<"dtlsv1">>) -> 'dtlsv1'.
%%--------------------------------------------------------------------
%% Helpers
%% types
sc(Type) -> #{type => Type}.
sc(Type) ->
sc_meta(Type, #{}).
sc(Type, Default) ->
hoconsc:mk(Type, #{default => Default}).
sc_meta(Type, #{default => Default}).
ref(Field) ->
hoconsc:ref(?MODULE, Field).
sc_meta(Type, Meta) ->
hoconsc:mk(Type, Meta).
map(Name, Type) ->
hoconsc:map(Name, Type).
ref(StructName) ->
ref(?MODULE, StructName).
ref(Mod, Field) ->
hoconsc:ref(Mod, Field).

View File

@ -133,11 +133,12 @@ listener_id(GwName, Type, LisName) ->
(bin(LisName))/binary
>>).
parse_listener_id(Id) when is_atom(Id) ->
parse_listener_id(atom_to_binary(Id));
parse_listener_id(Id) ->
try
[GwName, Type, Name] = binary:split(bin(Id), <<":">>, [global]),
{binary_to_existing_atom(GwName), binary_to_existing_atom(Type),
binary_to_atom(Name)}
{GwName, Type, Name}
catch
_ : _ -> error({invalid_listener_id, Id})
end.
@ -161,6 +162,8 @@ unix_ts_to_rfc3339(Ts) ->
emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>).
-spec stringfy(term()) -> binary().
stringfy(T) when is_list(T); is_binary(T) ->
iolist_to_binary(T);
stringfy(T) ->
iolist_to_binary(io_lib:format("~0p", [T])).

View File

@ -100,13 +100,15 @@ t_lookup_cmd_read(Config) ->
%% step 1, device register ...
test_send_coap_request( UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~s&lt=345&lwm2m=1", [?PORT, Epn]),
sprintf("coap://127.0.0.1:~b/rd?ep=~s&lt=600&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1),
timer:sleep(100),
test_recv_mqtt_response(RespTopic),
%% step2, send a READ command to device
@ -122,8 +124,8 @@ t_lookup_cmd_read(Config) ->
CommandJson = emqx_json:encode(Command),
?LOGT("CommandJson=~p", [CommandJson]),
test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
timer:sleep(50),
timer:sleep(200),
no_received_request(Epn, <<"/3/0/0">>, <<"read">>),
Request2 = test_recv_coap_request(UdpSock),
@ -131,8 +133,8 @@ t_lookup_cmd_read(Config) ->
timer:sleep(50),
test_send_coap_response(UdpSock, "127.0.0.1", ?PORT, {ok, content}, #coap_content{content_format = <<"text/plain">>, payload = <<"EMQ">>}, Request2, true),
timer:sleep(100),
timer:sleep(200),
normal_received_request(Epn, <<"/3/0/0">>, <<"read">>).
t_lookup_cmd_discover(Config) ->
@ -158,6 +160,7 @@ t_lookup_cmd_discover(Config) ->
CommandJson = emqx_json:encode(Command),
test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
timer:sleep(200),
no_received_request(Epn, <<"/3/0/7">>, <<"discover">>),
timer:sleep(50),
@ -172,7 +175,7 @@ t_lookup_cmd_discover(Config) ->
#coap_content{content_format = <<"application/link-format">>, payload = PayloadDiscover},
Request2,
true),
timer:sleep(100),
timer:sleep(200),
discover_received_request(Epn, <<"/3/0/7">>, <<"discover">>).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View File

@ -41,6 +41,9 @@ node {
## Default: 15m
global_gc_interval = 15m
## Sets the etc directory
etc_dir = "{{ platform_etc_dir }}"
## Sets the net_kernel tick time in seconds.
## Notice that all communicating nodes are to have the same
## TickTime value specified.

View File

@ -42,8 +42,7 @@
%% The list can not be made a dynamic read at run-time as it is used
%% by nodetool to generate app.<time>.config before EMQ X is started
-define(MERGED_CONFIGS,
[ emqx_schema
, emqx_bridge_schema
[ emqx_bridge_schema
, emqx_retainer_schema
, emqx_statsd_schema
, emqx_authz_schema
@ -59,13 +58,42 @@
namespace() -> undefined.
roots() ->
%% This is a temp workaround to define part of authorization config
%% in emqx_schema and part of it in emqx_authz_schema but then
%% merged here in this module
%% The proper fix should be to make connection (channel, session) state
%% extendable by e.g. allow hooks be stateful.
["cluster", "node", "rpc", "log", "authorization"] ++
lists:keydelete("authorization", 1, lists:flatmap(fun roots/1, ?MERGED_CONFIGS)).
%% authorization configs are merged in THIS schema's "authorization" fields
lists:keydelete("authorization", 1, emqx_schema:roots(high)) ++
[ {"node",
sc(hoconsc:ref("node"),
#{ desc => "Node name, cookie, config & data directories "
"and the Eralng virtual machine (beam) boot parameters."
})}
, {"cluster",
sc(hoconsc:ref("cluster"),
#{ desc => "EMQ X nodes can form a cluster to scale up the total capacity.<br>"
"Here holds the configs to instruct how individual nodes "
"can discover each other, also the database replication "
"role of this node etc."
})}
, {"log",
sc(hoconsc:ref("log"),
#{ desc => "Configure logging backends (to console or to file), "
"and logging level for each logger backend."
})}
, {"rpc",
sc(hoconsc:ref("rpc"),
#{ desc => "EMQ X uses a library called <code>gen_rpc</code> for "
"inter-broker RPCs.<br>Most of the time the default config "
"should work, but in case you need to do performance "
"fine-turning or experiment a bit, this is where to look."
})}
, {"authorization",
sc(hoconsc:ref("authorization"),
#{ desc => "In EMQ X, MQTT client access control can be just a few "
"lines of text based rules, or delegated to an external "
"HTTP API, or base externa database query results."
})}
] ++
emqx_schema:roots(medium) ++
emqx_schema:roots(low) ++
lists:flatmap(fun roots/1, ?MERGED_CONFIGS).
fields("cluster") ->
[ {"name",
@ -282,6 +310,27 @@ fields("node") ->
sc(ref("cluster_call"),
#{}
)}
, {"etc_dir",
sc(string(),
#{
converter => fun(EtcDir) ->
case filename:absname(EtcDir) =:= EtcDir of
true ->
unicode:characters_to_list(EtcDir);
false ->
unicode:characters_to_list(filename:join([code:lib_dir(), "..", EtcDir]))
end
end,
validator => fun(Path) ->
case filelib:is_dir(Path) of
true ->
ok;
false ->
error({not_dir, Path})
end
end
}
)}
];
fields("cluster_call") ->
@ -381,7 +430,7 @@ fields("rpc") ->
fields("log") ->
[ {"console_handler", ref("console_handler")}
, {"file_handlers",
sc(ref("file_handlers"),
sc(map(name, ref("log_file_handler")),
#{})}
, {"error_logger",
sc(atom(),
@ -396,12 +445,6 @@ fields("console_handler") ->
})}
] ++ log_handler_common_confs();
fields("file_handlers") ->
[ {"$name",
sc(ref("log_file_handler"),
#{})}
];
fields("log_file_handler") ->
[ {"file",
sc(file(),
@ -701,6 +744,8 @@ keys(Parent, Conf) ->
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
map(Name, Type) -> hoconsc:map(Name, Type).
ref(Field) -> hoconsc:ref(?MODULE, Field).
options(static, Conf) ->
@ -742,5 +787,4 @@ to_atom(Bin) when is_binary(Bin) ->
binary_to_atom(Bin, utf8).
roots(Module) ->
lists:map(fun({_BinName, Root}) -> Root end,
hocon_schema:roots(Module)).
lists:map(fun({_BinName, Root}) -> Root end, hocon_schema:roots(Module)).

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_id).
-module(emqx_plugin_libs_id).
-export([gen/0, gen/1]).

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_id_SUITE).
-module(emqx_plugin_libs_id_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -24,5 +24,5 @@
all() -> emqx_ct:all(?MODULE).
t_gen(_) ->
?assertEqual(10, length(emqx_rule_id:gen(10))),
?assertEqual(20, length(emqx_rule_id:gen(20))).
?assertEqual(10, length(emqx_plugin_libs_id:gen(10))),
?assertEqual(20, length(emqx_plugin_libs_id:gen(20))).

View File

@ -26,7 +26,7 @@ It is intended to be used by the emqx_bridges and all other resources that need
# The Demo
The data_bridge for mysql
The bridge for mysql
---
## The callback module 'emqx_mysql_connector'

View File

@ -507,7 +507,7 @@ rule_id() ->
gen_id("rule:", fun emqx_rule_registry:get_rule/1).
gen_id(Prefix, TestFun) ->
Id = iolist_to_binary([Prefix, emqx_rule_id:gen()]),
Id = iolist_to_binary([Prefix, emqx_plugin_libs_id:gen()]),
case TestFun(Id) of
not_found -> Id;
_Res -> gen_id(Prefix, TestFun)

View File

@ -48,8 +48,8 @@ test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
end.
test_rule(Sql, Select, Context, EventTopics) ->
RuleId = iolist_to_binary(["test_rule", emqx_rule_id:gen()]),
ActInstId = iolist_to_binary(["test_action", emqx_rule_id:gen()]),
RuleId = iolist_to_binary(["test_rule", emqx_plugin_libs_id:gen()]),
ActInstId = iolist_to_binary(["test_action", emqx_plugin_libs_id:gen()]),
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
ok = emqx_rule_metrics:create_metrics(ActInstId),
Rule = #rule{

View File

@ -14,6 +14,11 @@ ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)"
# shellcheck disable=SC1090
. "$ROOT_DIR"/releases/emqx_vars
# defined in emqx_vars
export RUNNER_ROOT_DIR
export RUNNER_ETC_DIR
export REL_VSN
RUNNER_SCRIPT="$RUNNER_BIN_DIR/$REL_NAME"
CODE_LOADING_MODE="${CODE_LOADING_MODE:-embedded}"
REL_DIR="$RUNNER_ROOT_DIR/releases/$REL_VSN"
@ -133,7 +138,6 @@ check_user() {
fi
}
# Make sure the user running this script is the owner and/or su to that user
check_user "$@"
ES=$?
@ -198,18 +202,12 @@ relx_gen_id() {
# Control a node
relx_nodetool() {
command="$1"; shift
export RUNNER_ROOT_DIR
export REL_VSN
ERL_FLAGS="$ERL_FLAGS $EPMD_ARG" \
"$ERTS_DIR/bin/escript" "$ROOTDIR/bin/nodetool" "$NAME_TYPE" "$NAME" \
-setcookie "$COOKIE" "$command" "$@"
}
call_hocon() {
export RUNNER_ROOT_DIR
export RUNNER_ETC_DIR
export REL_VSN
"$ERTS_DIR/bin/escript" "$ROOTDIR/bin/nodetool" hocon "$@" \
|| die "call_hocon_failed: $*" $?
}
@ -217,8 +215,6 @@ call_hocon() {
# Run an escript in the node's environment
relx_escript() {
shift; scriptpath="$1"; shift
export RUNNER_ROOT_DIR
"$ERTS_DIR/bin/escript" "$ROOTDIR/$scriptpath" "$@"
}
@ -365,18 +361,14 @@ esac
## or long name (with '@') e.g. 'emqx@example.net' or 'emqx@127.0.0.1'
NAME="${EMQX_NODE_NAME:-}"
if [ -z "$NAME" ]; then
if [ "$IS_BOOT_COMMAND" = 'no' ]; then
if [ "$IS_BOOT_COMMAND" = 'yes' ]; then
# for boot commands, inspect emqx.conf for node name
NAME="$(call_hocon -s $SCHEMA_MOD -c "$RUNNER_ETC_DIR"/emqx.conf get node.name | tr -d \")"
else
# for non-boot commands, inspect vm.<time>.args for node name
# shellcheck disable=SC2012,SC2086
LATEST_VM_ARGS="$(ls -t $CONFIGS_DIR/vm.*.args | head -1)"
if [ -z "$LATEST_VM_ARGS" ]; then
echoerr "no_vm_arg_file_found_for $1 in $CONFIGS_DIR/"
exit 1
fi
NAME="$(grep -E '^-s?name' "$LATEST_VM_ARGS" | awk '{print $2}')"
else
# for boot commands, inspect emqx.conf for node name
NAME="$(call_hocon -s $SCHEMA_MOD -c "$RUNNER_ETC_DIR"/emqx.conf get node.name | tr -d \")"
fi
fi
@ -402,10 +394,6 @@ if [ -z "$COOKIE" ]; then
else
# shellcheck disable=SC2012,SC2086
LATEST_VM_ARGS="$(ls -t $CONFIGS_DIR/vm.*.args | head -1)"
if [ -z "$LATEST_VM_ARGS" ]; then
echoerr "no_vm_arg_file_found_for $1 in $CONFIGS_DIR/"
exit 1
fi
COOKIE="$(grep -E '^-setcookie' "$LATEST_VM_ARGS" | awk '{print $2}')"
fi
fi

View File

@ -117,40 +117,43 @@ do(Args) ->
io:format("~p\n", [Other])
end;
["eval" | ListOfArgs] ->
% shells may process args into more than one, and end up stripping
% spaces, so this converts all of that to a single string to parse
String = binary_to_list(
list_to_binary(
join(ListOfArgs," ")
)
),
% then just as a convenience to users, if they forgot a trailing
% '.' add it for them.
Normalized =
case lists:reverse(String) of
[$. | _] -> String;
R -> lists:reverse([$. | R])
end,
% then scan and parse the string
{ok, Scanned, _} = erl_scan:string(Normalized),
{ok, Parsed } = erl_parse:parse_exprs(Scanned),
Parsed = parse_eval_args(ListOfArgs),
% and evaluate it on the remote node
case rpc:call(TargetNode, erl_eval, exprs, [Parsed, [] ]) of
{value, Value, _} ->
io:format ("~p\n",[Value]);
io:format ("~p~n",[Value]);
{badrpc, Reason} ->
io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
io:format("RPC to ~p failed: ~p~n", [TargetNode, Reason]),
halt(1)
end;
Other ->
io:format("Other: ~p\n", [Other]),
io:format("Usage: nodetool {genconfig, chkconfig|getpid|ping|stop|rpc|rpc_infinity|rpcterms|eval [Terms]} [RPC]\n")
io:format("Other: ~p~n", [Other]),
io:format("Usage: nodetool chkconfig|getpid|ping|stop|rpc|rpc_infinity|rpcterms|eval|cold_eval [Terms] [RPC]\n")
end,
net_kernel:stop().
parse_eval_args(Args) ->
% shells may process args into more than one, and end up stripping
% spaces, so this converts all of that to a single string to parse
String = binary_to_list(
list_to_binary(
join(Args," ")
)
),
% then just as a convenience to users, if they forgot a trailing
% '.' add it for them.
Normalized =
case lists:reverse(String) of
[$. | _] -> String;
R -> lists:reverse([$. | R])
end,
% then scan and parse the string
{ok, Scanned, _} = erl_scan:string(Normalized),
{ok, Parsed } = erl_parse:parse_exprs(Scanned),
Parsed.
do_with_ret(Args, Name, Handler) ->
{arity, Arity} = erlang:fun_info(Handler, arity),
case take_args(Args, Name, Arity) of

18
build
View File

@ -62,13 +62,29 @@ log() {
echo "===< $msg"
}
docgen() {
local conf_doc_html libs_dir1 libs_dir2
conf_doc_html="$(pwd)/_build/${PROFILE}/rel/emqx/etc/emqx-config-doc.html"
echo "===< Generating config document $conf_doc_html"
libs_dir1="$(find "_build/default/lib/" -maxdepth 2 -name ebin -type d)"
libs_dir2="$(find "_build/$PROFILE/lib/" -maxdepth 2 -name ebin -type d)"
# shellcheck disable=SC2086
erl -noshell -pa $libs_dir1 $libs_dir2 -eval "file:write_file('$conf_doc_html', hocon_schema_html:gen(emqx_machine_schema, \"EMQ X ${PKG_VSN}\")), halt(0)."
local conf_doc_markdown
conf_doc_markdown="$(pwd)/_build/${PROFILE}/rel/emqx/etc/emqx-config-doc.md"
echo "===< Generating config document $conf_doc_markdown"
# shellcheck disable=SC2086
erl -noshell -pa $libs_dir1 $libs_dir2 -eval "file:write_file('$conf_doc_markdown', hocon_schema_doc:gen(emqx_machine_schema)), halt(0)."
}
make_rel() {
# shellcheck disable=SC1010
./rebar3 as "$PROFILE" do release,tar
if [ "$(find "_build/$PROFILE/rel/emqx/lib/" -name 'gpb-*' -type d)" != "" ]; then
if [ "$(find "_build/$PROFILE/rel/emqx/lib/" -maxdepth 1 -name 'gpb-*' -type d)" != "" ]; then
echo "gpb should not be included in the release"
exit 1
fi
docgen
}
## unzip previous version .zip files to _build/$PROFILE/rel/emqx/releases before making relup

View File

@ -60,7 +60,7 @@
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
, {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.15.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.17.0"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.0"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.1"}}}