Merge branch 'master' into dev/ee5.0
This commit is contained in:
commit
3fe76446bc
|
@ -23,6 +23,7 @@
|
|||
**‼️ Note** : The previous API only returns array: `[RuleObj1,RuleObj2]`, after updating, it will become
|
||||
`{"data": [RuleObj1,RuleObj2], "meta":{"count":2, "limit":100, "page":1}`,
|
||||
which will carry the paging meta information.
|
||||
* Fix the issue that webhook leaks TCP connections. [ehttpc#34](https://github.com/emqx/ehttpc/pull/34), [#8580](https://github.com/emqx/emqx/pull/8580)
|
||||
|
||||
## Enhancements
|
||||
|
||||
|
@ -31,6 +32,8 @@
|
|||
* Remove `/configs/listeners` API, use `/listeners/` instead. [#8485](https://github.com/emqx/emqx/pull/8485)
|
||||
* Optimize performance of builtin database operations in processes with long message queue [#8439](https://github.com/emqx/emqx/pull/8439)
|
||||
* Improve authentication tracing. [#8554](https://github.com/emqx/emqx/pull/8554)
|
||||
* Standardize the '/listeners' and `/gateway/<name>/listeners` API fields.
|
||||
It will introduce some incompatible updates, see [#8571](https://github.com/emqx/emqx/pull/8571)
|
||||
|
||||
# 5.0.3
|
||||
|
||||
|
|
3
Makefile
3
Makefile
|
@ -7,7 +7,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-d
|
|||
export EMQX_DEFAULT_RUNNER = debian:11-slim
|
||||
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
|
||||
export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
|
||||
export EMQX_DASHBOARD_VERSION ?= v1.0.5-beta.1
|
||||
export EMQX_DASHBOARD_VERSION ?= v1.0.5
|
||||
export EMQX_REL_FORM ?= tgz
|
||||
export QUICER_DOWNLOAD_FROM_RELEASE = 1
|
||||
ifeq ($(OS),Windows_NT)
|
||||
|
@ -249,3 +249,4 @@ $(foreach tt,$(ALL_ELIXIR_TGZS),$(eval $(call gen-elixir-tgz-target,$(tt))))
|
|||
fmt: $(REBAR)
|
||||
@./scripts/erlfmt -w '{apps,lib-ee}/*/{src,include,test}/**/*.{erl,hrl,app.src}'
|
||||
@./scripts/erlfmt -w 'rebar.config.erl'
|
||||
@mix format
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
{emqx_gateway_cm,1}.
|
||||
{emqx_gateway_http,1}.
|
||||
{emqx_license,1}.
|
||||
{emqx_license,2}.
|
||||
{emqx_management,1}.
|
||||
{emqx_management,2}.
|
||||
{emqx_mgmt_api_plugins,1}.
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}},
|
||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.3"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.2"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.3"}}},
|
||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
|
||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.29.0"}}},
|
||||
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
|
||||
|
|
|
@ -54,7 +54,7 @@
|
|||
|
||||
-export([pre_config_update/3, post_config_update/5]).
|
||||
|
||||
-export([format_addr/1]).
|
||||
-export([format_bind/1]).
|
||||
|
||||
-define(CONF_KEY_PATH, [listeners, '?', '?']).
|
||||
-define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
|
||||
|
@ -201,14 +201,14 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
|||
?tp(listener_started, #{type => Type, bind => Bind}),
|
||||
console_print(
|
||||
"Listener ~ts on ~ts started.~n",
|
||||
[listener_id(Type, ListenerName), format_addr(Bind)]
|
||||
[listener_id(Type, ListenerName), format_bind(Bind)]
|
||||
),
|
||||
ok;
|
||||
{error, {already_started, Pid}} ->
|
||||
{error, {already_started, Pid}};
|
||||
{error, Reason} ->
|
||||
ListenerId = listener_id(Type, ListenerName),
|
||||
BindStr = format_addr(Bind),
|
||||
BindStr = format_bind(Bind),
|
||||
?ELOG(
|
||||
"Failed to start listener ~ts on ~ts: ~0p.~n",
|
||||
[ListenerId, BindStr, Reason]
|
||||
|
@ -261,19 +261,19 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
|||
ok ->
|
||||
console_print(
|
||||
"Listener ~ts on ~ts stopped.~n",
|
||||
[listener_id(Type, ListenerName), format_addr(Bind)]
|
||||
[listener_id(Type, ListenerName), format_bind(Bind)]
|
||||
),
|
||||
ok;
|
||||
{error, not_found} ->
|
||||
?ELOG(
|
||||
"Failed to stop listener ~ts on ~ts: ~0p~n",
|
||||
[listener_id(Type, ListenerName), format_addr(Bind), already_stopped]
|
||||
[listener_id(Type, ListenerName), format_bind(Bind), already_stopped]
|
||||
),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
?ELOG(
|
||||
"Failed to stop listener ~ts on ~ts: ~0p~n",
|
||||
[listener_id(Type, ListenerName), format_addr(Bind), Reason]
|
||||
[listener_id(Type, ListenerName), format_bind(Bind), Reason]
|
||||
),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
@ -492,17 +492,32 @@ merge_default(Options) ->
|
|||
[{tcp_options, ?MQTT_SOCKOPTS} | Options]
|
||||
end.
|
||||
|
||||
format_addr(Port) when is_integer(Port) ->
|
||||
io_lib:format("~w", [Port]);
|
||||
-spec format_bind(
|
||||
integer() | {tuple(), integer()} | string() | binary()
|
||||
) -> io_lib:chars().
|
||||
format_bind(Port) when is_integer(Port) ->
|
||||
io_lib:format(":~w", [Port]);
|
||||
%% Print only the port number when bound on all interfaces
|
||||
format_addr({{0, 0, 0, 0}, Port}) ->
|
||||
format_addr(Port);
|
||||
format_addr({{0, 0, 0, 0, 0, 0, 0, 0}, Port}) ->
|
||||
format_addr(Port);
|
||||
format_addr({Addr, Port}) when is_list(Addr) ->
|
||||
format_bind({{0, 0, 0, 0}, Port}) ->
|
||||
format_bind(Port);
|
||||
format_bind({{0, 0, 0, 0, 0, 0, 0, 0}, Port}) ->
|
||||
format_bind(Port);
|
||||
format_bind({Addr, Port}) when is_list(Addr) ->
|
||||
io_lib:format("~ts:~w", [Addr, Port]);
|
||||
format_addr({Addr, Port}) when is_tuple(Addr) ->
|
||||
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]).
|
||||
format_bind({Addr, Port}) when is_tuple(Addr), tuple_size(Addr) == 4 ->
|
||||
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]);
|
||||
format_bind({Addr, Port}) when is_tuple(Addr), tuple_size(Addr) == 8 ->
|
||||
io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]);
|
||||
%% Support string, binary type for Port or IP:Port
|
||||
format_bind(Str) when is_list(Str) ->
|
||||
case emqx_schema:to_ip_port(Str) of
|
||||
{ok, {Ip, Port}} ->
|
||||
format_bind({Ip, Port});
|
||||
{error, _} ->
|
||||
format_bind(list_to_integer(Str))
|
||||
end;
|
||||
format_bind(Bin) when is_binary(Bin) ->
|
||||
format_bind(binary_to_list(Bin)).
|
||||
|
||||
listener_id(Type, ListenerName) ->
|
||||
list_to_atom(lists:append([str(Type), ":", str(ListenerName)])).
|
||||
|
|
|
@ -595,6 +595,7 @@ setup_node(Node, Opts) when is_map(Opts) ->
|
|||
EnvHandler = maps:get(env_handler, Opts, fun(_) -> ok end),
|
||||
ConfigureGenRpc = maps:get(configure_gen_rpc, Opts, true),
|
||||
LoadSchema = maps:get(load_schema, Opts, true),
|
||||
SchemaMod = maps:get(schema_mod, Opts, emqx_schema),
|
||||
LoadApps = maps:get(load_apps, Opts, [gen_rpc, emqx, ekka, mria] ++ Apps),
|
||||
Env = maps:get(env, Opts, []),
|
||||
Conf = maps:get(conf, Opts, []),
|
||||
|
@ -630,7 +631,7 @@ setup_node(Node, Opts) when is_map(Opts) ->
|
|||
%% Otherwise, configuration get's loaded and all preset env in envhandler is lost
|
||||
LoadSchema andalso
|
||||
begin
|
||||
emqx_config:init_load(emqx_schema),
|
||||
emqx_config:init_load(SchemaMod),
|
||||
application:set_env(emqx, init_config_load_done, true)
|
||||
end,
|
||||
|
||||
|
|
|
@ -127,6 +127,17 @@ HTTP 请求的正文。</br>
|
|||
}
|
||||
}
|
||||
|
||||
config_max_retries {
|
||||
desc {
|
||||
en: """HTTP request max retry times if failed."""
|
||||
zh: """HTTP 请求失败最大重试次数"""
|
||||
}
|
||||
label: {
|
||||
en: "HTTP Request Max Retries"
|
||||
zh: "HTTP 请求重试次数"
|
||||
}
|
||||
}
|
||||
|
||||
desc_type {
|
||||
desc {
|
||||
en: """The Bridge Type"""
|
||||
|
|
|
@ -233,7 +233,6 @@ info_example_basic(webhook, _) ->
|
|||
request_timeout => <<"15s">>,
|
||||
connect_timeout => <<"15s">>,
|
||||
max_retries => 3,
|
||||
retry_interval => <<"10s">>,
|
||||
pool_type => <<"random">>,
|
||||
pool_size => 4,
|
||||
enable_pipelining => 100,
|
||||
|
|
|
@ -246,7 +246,8 @@ parse_confs(
|
|||
method := Method,
|
||||
body := Body,
|
||||
headers := Headers,
|
||||
request_timeout := ReqTimeout
|
||||
request_timeout := ReqTimeout,
|
||||
max_retries := Retry
|
||||
} = Conf
|
||||
) when Type == webhook orelse Type == <<"webhook">> ->
|
||||
{BaseUrl, Path} = parse_url(Url),
|
||||
|
@ -259,7 +260,8 @@ parse_confs(
|
|||
method => Method,
|
||||
body => Body,
|
||||
headers => Headers,
|
||||
request_timeout => ReqTimeout
|
||||
request_timeout => ReqTimeout,
|
||||
max_retries => Retry
|
||||
}
|
||||
};
|
||||
parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when
|
||||
|
|
|
@ -14,7 +14,46 @@ namespace() -> "bridge".
|
|||
roots() -> [].
|
||||
|
||||
fields("config") ->
|
||||
basic_config() ++
|
||||
basic_config() ++ request_config();
|
||||
fields("post") ->
|
||||
[
|
||||
type_field(),
|
||||
name_field()
|
||||
] ++ fields("config");
|
||||
fields("put") ->
|
||||
fields("config");
|
||||
fields("get") ->
|
||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post").
|
||||
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for WebHook using `", string:to_upper(Method), "` method."];
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
basic_config() ->
|
||||
[
|
||||
{enable,
|
||||
mk(
|
||||
boolean(),
|
||||
#{
|
||||
desc => ?DESC("config_enable"),
|
||||
default => true
|
||||
}
|
||||
)},
|
||||
{direction,
|
||||
mk(
|
||||
egress,
|
||||
#{
|
||||
desc => ?DESC("config_direction"),
|
||||
default => egress
|
||||
}
|
||||
)}
|
||||
] ++
|
||||
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
||||
|
||||
request_config() ->
|
||||
[
|
||||
{url,
|
||||
mk(
|
||||
|
@ -59,6 +98,14 @@ fields("config") ->
|
|||
desc => ?DESC("config_body")
|
||||
}
|
||||
)},
|
||||
{max_retries,
|
||||
mk(
|
||||
non_neg_integer(),
|
||||
#{
|
||||
default => 2,
|
||||
desc => ?DESC("config_max_retries")
|
||||
}
|
||||
)},
|
||||
{request_timeout,
|
||||
mk(
|
||||
emqx_schema:duration_ms(),
|
||||
|
@ -67,44 +114,7 @@ fields("config") ->
|
|||
desc => ?DESC("config_request_timeout")
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields("post") ->
|
||||
[
|
||||
type_field(),
|
||||
name_field()
|
||||
] ++ fields("config");
|
||||
fields("put") ->
|
||||
fields("config");
|
||||
fields("get") ->
|
||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post").
|
||||
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for WebHook using `", string:to_upper(Method), "` method."];
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
basic_config() ->
|
||||
[
|
||||
{enable,
|
||||
mk(
|
||||
boolean(),
|
||||
#{
|
||||
desc => ?DESC("config_enable"),
|
||||
default => true
|
||||
}
|
||||
)},
|
||||
{direction,
|
||||
mk(
|
||||
egress,
|
||||
#{
|
||||
desc => ?DESC("config_direction"),
|
||||
default => egress
|
||||
}
|
||||
)}
|
||||
] ++
|
||||
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
||||
].
|
||||
|
||||
%%======================================================================================
|
||||
|
||||
|
|
|
@ -41,17 +41,6 @@ base URL 只包含host和port。</br>
|
|||
}
|
||||
}
|
||||
|
||||
retry_interval {
|
||||
desc {
|
||||
en: "Interval between retries."
|
||||
zh: "重试之间的间隔时间。"
|
||||
}
|
||||
label: {
|
||||
en: "Retry Interval"
|
||||
zh: "重试间隔"
|
||||
}
|
||||
}
|
||||
|
||||
pool_type {
|
||||
desc {
|
||||
en: "The type of the pool. Can be one of `random`, `hash`."
|
||||
|
|
|
@ -88,22 +88,6 @@ fields(config) ->
|
|||
desc => ?DESC("connect_timeout")
|
||||
}
|
||||
)},
|
||||
{max_retries,
|
||||
sc(
|
||||
non_neg_integer(),
|
||||
#{
|
||||
default => 5,
|
||||
desc => ?DESC("max_retries")
|
||||
}
|
||||
)},
|
||||
{retry_interval,
|
||||
sc(
|
||||
emqx_schema:duration(),
|
||||
#{
|
||||
default => "1s",
|
||||
desc => ?DESC("retry_interval")
|
||||
}
|
||||
)},
|
||||
{pool_type,
|
||||
sc(
|
||||
pool_type(),
|
||||
|
@ -147,6 +131,14 @@ fields("request") ->
|
|||
{path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})},
|
||||
{body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})},
|
||||
{headers, hoconsc:mk(map(), #{required => false, desc => ?DESC("headers")})},
|
||||
{max_retries,
|
||||
sc(
|
||||
non_neg_integer(),
|
||||
#{
|
||||
required => false,
|
||||
desc => ?DESC("max_retries")
|
||||
}
|
||||
)},
|
||||
{request_timeout,
|
||||
sc(
|
||||
emqx_schema:duration_ms(),
|
||||
|
@ -182,8 +174,6 @@ on_start(
|
|||
path := BasePath
|
||||
},
|
||||
connect_timeout := ConnectTimeout,
|
||||
max_retries := MaxRetries,
|
||||
retry_interval := RetryInterval,
|
||||
pool_type := PoolType,
|
||||
pool_size := PoolSize
|
||||
} = Config
|
||||
|
@ -206,8 +196,6 @@ on_start(
|
|||
{host, Host},
|
||||
{port, Port},
|
||||
{connect_timeout, ConnectTimeout},
|
||||
{retry, MaxRetries},
|
||||
{retry_timeout, RetryInterval},
|
||||
{keepalive, 30000},
|
||||
{pool_type, PoolType},
|
||||
{pool_size, PoolSize},
|
||||
|
@ -247,17 +235,23 @@ on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
|
|||
path := Path,
|
||||
body := Body,
|
||||
headers := Headers,
|
||||
request_timeout := Timeout
|
||||
request_timeout := Timeout,
|
||||
max_retries := Retry
|
||||
} = process_request(Request, Msg),
|
||||
on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State)
|
||||
on_query(
|
||||
InstId,
|
||||
{undefined, Method, {Path, Headers, Body}, Timeout, Retry},
|
||||
AfterQuery,
|
||||
State
|
||||
)
|
||||
end;
|
||||
on_query(InstId, {Method, Request}, AfterQuery, State) ->
|
||||
on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State);
|
||||
on_query(InstId, {undefined, Method, Request, 5000, 2}, AfterQuery, State);
|
||||
on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
|
||||
on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State);
|
||||
on_query(InstId, {undefined, Method, Request, Timeout, 2}, AfterQuery, State);
|
||||
on_query(
|
||||
InstId,
|
||||
{KeyOrNum, Method, Request, Timeout},
|
||||
{KeyOrNum, Method, Request, Timeout, Retry},
|
||||
AfterQuery,
|
||||
#{pool_name := PoolName, base_path := BasePath} = State
|
||||
) ->
|
||||
|
@ -275,7 +269,8 @@ on_query(
|
|||
end,
|
||||
Method,
|
||||
NRequest,
|
||||
Timeout
|
||||
Timeout,
|
||||
Retry
|
||||
)
|
||||
of
|
||||
{error, Reason} ->
|
||||
|
@ -368,7 +363,8 @@ preprocess_request(
|
|||
path => emqx_plugin_libs_rule:preproc_tmpl(Path),
|
||||
body => emqx_plugin_libs_rule:preproc_tmpl(Body),
|
||||
headers => preproc_headers(Headers),
|
||||
request_timeout => maps:get(request_timeout, Req, 30000)
|
||||
request_timeout => maps:get(request_timeout, Req, 30000),
|
||||
max_retries => maps:get(max_retries, Req, 2)
|
||||
}.
|
||||
|
||||
preproc_headers(Headers) when is_map(Headers) ->
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
dashboard {
|
||||
listeners.http {
|
||||
bind: 18083
|
||||
bind = 18083
|
||||
}
|
||||
default_username: "admin"
|
||||
default_password: "public"
|
||||
default_username = "admin"
|
||||
default_password = "public"
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ start_listeners(Listeners) ->
|
|||
case minirest:start(Name, RanchOptions, Minirest) of
|
||||
{ok, _} ->
|
||||
?ULOG("Listener ~ts on ~ts started.~n", [
|
||||
Name, emqx_listeners:format_addr(Bind)
|
||||
Name, emqx_listeners:format_bind(Bind)
|
||||
]),
|
||||
Acc;
|
||||
{error, _Reason} ->
|
||||
|
@ -114,7 +114,7 @@ stop_listeners(Listeners) ->
|
|||
case minirest:stop(Name) of
|
||||
ok ->
|
||||
?ULOG("Stop listener ~ts on ~ts successfully.~n", [
|
||||
Name, emqx_listeners:format_addr(Port)
|
||||
Name, emqx_listeners:format_bind(Port)
|
||||
]);
|
||||
{error, not_found} ->
|
||||
?SLOG(warning, #{msg => "stop_listener_failed", name => Name, port => Port})
|
||||
|
|
|
@ -112,6 +112,13 @@ emqx_gateway_api_listeners {
|
|||
}
|
||||
}
|
||||
|
||||
listener_status {
|
||||
desc {
|
||||
en: """listener status """
|
||||
zh: """监听器状态"""
|
||||
}
|
||||
}
|
||||
|
||||
listener_node_status {
|
||||
desc {
|
||||
en: """listener status of each node in the cluster"""
|
||||
|
|
|
@ -81,7 +81,7 @@ paths() ->
|
|||
|
||||
listeners(get, #{bindings := #{name := Name0}}) ->
|
||||
with_gateway(Name0, fun(GwName, _) ->
|
||||
Result = get_cluster_listeners_info(GwName),
|
||||
Result = lists:map(fun bind2str/1, get_cluster_listeners_info(GwName)),
|
||||
{200, Result}
|
||||
end);
|
||||
listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
|
||||
|
@ -119,7 +119,7 @@ listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) ->
|
|||
with_gateway(Name0, fun(_GwName, _) ->
|
||||
case emqx_gateway_conf:listener(ListenerId) of
|
||||
{ok, Listener} ->
|
||||
{200, Listener};
|
||||
{200, bind2str(Listener)};
|
||||
{error, not_found} ->
|
||||
return_http_error(404, "Listener not found");
|
||||
{error, Reason} ->
|
||||
|
@ -266,11 +266,14 @@ get_cluster_listeners_info(GwName) ->
|
|||
ClusterStatus
|
||||
),
|
||||
|
||||
{MaxCons, CurrCons} = emqx_gateway_http:sum_cluster_connections(NodeStatus),
|
||||
{MaxCons, CurrCons, Running} = aggregate_listener_status(NodeStatus),
|
||||
|
||||
Listener#{
|
||||
status => #{
|
||||
running => Running,
|
||||
max_connections => MaxCons,
|
||||
current_connections => CurrCons,
|
||||
current_connections => CurrCons
|
||||
},
|
||||
node_status => NodeStatus
|
||||
}
|
||||
end,
|
||||
|
@ -292,22 +295,25 @@ do_listeners_cluster_status(Listeners) ->
|
|||
fun({Id, ListenOn}, Acc) ->
|
||||
BinId = erlang:atom_to_binary(Id),
|
||||
{ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId),
|
||||
Curr =
|
||||
{Running, Curr} =
|
||||
try esockd:get_current_connections({Id, ListenOn}) of
|
||||
Int -> Int
|
||||
Int -> {true, Int}
|
||||
catch
|
||||
%% not started
|
||||
error:not_found ->
|
||||
0
|
||||
{false, 0}
|
||||
end,
|
||||
Acc#{
|
||||
Id => #{
|
||||
node => Node,
|
||||
status => #{
|
||||
running => Running,
|
||||
current_connections => Curr,
|
||||
%% XXX: Since it is taken from raw-conf, it is possible a string
|
||||
max_connections => int(Max)
|
||||
}
|
||||
}
|
||||
}
|
||||
end,
|
||||
#{},
|
||||
Listeners
|
||||
|
@ -317,6 +323,31 @@ int(B) when is_binary(B) ->
|
|||
binary_to_integer(B);
|
||||
int(I) when is_integer(I) ->
|
||||
I.
|
||||
aggregate_listener_status(NodeStatus) ->
|
||||
aggregate_listener_status(NodeStatus, 0, 0, undefined).
|
||||
|
||||
aggregate_listener_status(
|
||||
[
|
||||
#{status := #{running := Running, max_connections := Max, current_connections := Current}}
|
||||
| T
|
||||
],
|
||||
MaxAcc,
|
||||
CurrAcc,
|
||||
RunningAcc
|
||||
) ->
|
||||
NRunning = aggregate_running(Running, RunningAcc),
|
||||
aggregate_listener_status(T, MaxAcc + Max, Current + CurrAcc, NRunning);
|
||||
aggregate_listener_status([], MaxAcc, CurrAcc, RunningAcc) ->
|
||||
{MaxAcc, CurrAcc, RunningAcc}.
|
||||
|
||||
aggregate_running(R, R) -> R;
|
||||
aggregate_running(R, undefined) -> R;
|
||||
aggregate_running(_, _) -> inconsistent.
|
||||
|
||||
bind2str(Listener = #{bind := Bind}) ->
|
||||
Listener#{bind := iolist_to_binary(emqx_listeners:format_bind(Bind))};
|
||||
bind2str(Listener = #{<<"bind">> := Bind}) ->
|
||||
Listener#{<<"bind">> := iolist_to_binary(emqx_listeners:format_bind(Bind))}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Swagger defines
|
||||
|
@ -590,22 +621,25 @@ params_paging_in_qs() ->
|
|||
roots() ->
|
||||
[listener].
|
||||
|
||||
fields(listener_node_status) ->
|
||||
fields(listener_status) ->
|
||||
[
|
||||
{current_connections, mk(non_neg_integer(), #{desc => ?DESC(current_connections)})},
|
||||
{status,
|
||||
mk(ref(emqx_mgmt_api_listeners, status), #{
|
||||
desc => ?DESC(listener_status)
|
||||
})},
|
||||
{node_status,
|
||||
mk(hoconsc:array(ref(emqx_mgmt_api_listeners, node_status)), #{
|
||||
desc => ?DESC(listener_node_status)
|
||||
})}
|
||||
];
|
||||
fields(tcp_listener) ->
|
||||
emqx_gateway_api:fields(tcp_listener) ++ fields(listener_node_status);
|
||||
emqx_gateway_api:fields(tcp_listener) ++ fields(listener_status);
|
||||
fields(ssl_listener) ->
|
||||
emqx_gateway_api:fields(ssl_listener) ++ fields(listener_node_status);
|
||||
emqx_gateway_api:fields(ssl_listener) ++ fields(listener_status);
|
||||
fields(udp_listener) ->
|
||||
emqx_gateway_api:fields(udp_listener) ++ fields(listener_node_status);
|
||||
emqx_gateway_api:fields(udp_listener) ++ fields(listener_status);
|
||||
fields(dtls_listener) ->
|
||||
emqx_gateway_api:fields(dtls_listener) ++ fields(listener_node_status);
|
||||
emqx_gateway_api:fields(dtls_listener) ++ fields(listener_status);
|
||||
fields(_) ->
|
||||
[].
|
||||
|
||||
|
@ -623,13 +657,20 @@ listener_node_status_schema() ->
|
|||
examples_listener_list() ->
|
||||
Convert = fun(Cfg) ->
|
||||
Cfg#{
|
||||
current_connections => 0,
|
||||
status => #{
|
||||
running => true,
|
||||
max_connections => 1024000,
|
||||
current_connections => 10
|
||||
},
|
||||
node_status => [
|
||||
#{
|
||||
node => <<"127.0.0.1">>,
|
||||
current_connections => 0,
|
||||
node => <<"emqx@127.0.0.1">>,
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 10,
|
||||
max_connections => 1024000
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
end,
|
||||
|
|
|
@ -181,24 +181,11 @@ do_convert_listener(GwName, LType, Conf) ->
|
|||
|
||||
do_convert_listener2(GwName, LType, LName, LConf) ->
|
||||
ListenerId = emqx_gateway_utils:listener_id(GwName, LType, LName),
|
||||
Running = emqx_gateway_utils:is_running(ListenerId, LConf),
|
||||
bind2str(
|
||||
LConf#{
|
||||
id => ListenerId,
|
||||
type => LType,
|
||||
name => LName,
|
||||
running => Running
|
||||
}
|
||||
).
|
||||
|
||||
bind2str(LConf = #{bind := Bind}) when is_integer(Bind) ->
|
||||
maps:put(bind, integer_to_binary(Bind), LConf);
|
||||
bind2str(LConf = #{<<"bind">> := Bind}) when is_integer(Bind) ->
|
||||
maps:put(<<"bind">>, integer_to_binary(Bind), LConf);
|
||||
bind2str(LConf = #{bind := Bind}) when is_binary(Bind) ->
|
||||
LConf;
|
||||
bind2str(LConf = #{<<"bind">> := Bind}) when is_binary(Bind) ->
|
||||
LConf.
|
||||
name => LName
|
||||
}.
|
||||
|
||||
get_bind(#{bind := Bind}) ->
|
||||
emqx_gateway_utils:parse_listenon(Bind);
|
||||
|
|
|
@ -37,7 +37,6 @@
|
|||
|
||||
-export([
|
||||
apply/2,
|
||||
format_listenon/1,
|
||||
parse_listenon/1,
|
||||
unix_ts_to_rfc3339/1,
|
||||
unix_ts_to_rfc3339/2,
|
||||
|
@ -165,7 +164,7 @@ start_listener(
|
|||
{Type, LisName, ListenOn, SocketOpts, Cfg},
|
||||
ModCfg
|
||||
) ->
|
||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
||||
ListenOnStr = emqx_listeners:format_bind(ListenOn),
|
||||
ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
||||
|
||||
NCfg = maps:merge(Cfg, ModCfg),
|
||||
|
@ -243,7 +242,7 @@ stop_listeners(GwName, Listeners) ->
|
|||
-spec stop_listener(GwName :: atom(), Listener :: tuple()) -> ok.
|
||||
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
||||
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
|
||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
||||
ListenOnStr = emqx_listeners:format_bind(ListenOn),
|
||||
case StopRet of
|
||||
ok ->
|
||||
console_print(
|
||||
|
@ -287,13 +286,6 @@ apply(F, A2) when
|
|||
->
|
||||
erlang:apply(F, A2).
|
||||
|
||||
format_listenon(Port) when is_integer(Port) ->
|
||||
io_lib:format("0.0.0.0:~w", [Port]);
|
||||
format_listenon({Addr, Port}) when is_list(Addr) ->
|
||||
io_lib:format("~ts:~w", [Addr, Port]);
|
||||
format_listenon({Addr, Port}) when is_tuple(Addr) ->
|
||||
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]).
|
||||
|
||||
parse_listenon(Port) when is_integer(Port) ->
|
||||
Port;
|
||||
parse_listenon(IpPort) when is_tuple(IpPort) ->
|
||||
|
|
|
@ -167,7 +167,7 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
|
|||
)}
|
||||
]
|
||||
end,
|
||||
ListenOnStr = emqx_listeners:format_addr(ListenOn),
|
||||
ListenOnStr = emqx_listeners:format_bind(ListenOn),
|
||||
case grpc:start_server(GwName, ListenOn, Services, SvrOptions) of
|
||||
{ok, _SvrPid} ->
|
||||
console_print(
|
||||
|
|
|
@ -340,7 +340,7 @@ t_listeners_tcp(_) ->
|
|||
LisConf = #{
|
||||
name => <<"def">>,
|
||||
type => <<"tcp">>,
|
||||
bind => <<"61613">>
|
||||
bind => <<"127.0.0.1:61613">>
|
||||
},
|
||||
{201, _} = request(post, "/gateway/stomp/listeners", LisConf),
|
||||
{200, ConfResp} = request(get, "/gateway/stomp/listeners"),
|
||||
|
@ -348,7 +348,7 @@ t_listeners_tcp(_) ->
|
|||
{200, ConfResp1} = request(get, "/gateway/stomp/listeners/stomp:tcp:def"),
|
||||
assert_confs(LisConf, ConfResp1),
|
||||
|
||||
LisConf2 = maps:merge(LisConf, #{bind => <<"61614">>}),
|
||||
LisConf2 = maps:merge(LisConf, #{bind => <<"127.0.0.1:61614">>}),
|
||||
{200, _} = request(
|
||||
put,
|
||||
"/gateway/stomp/listeners/stomp:tcp:def",
|
||||
|
@ -369,7 +369,7 @@ t_listeners_authn(_) ->
|
|||
#{
|
||||
name => <<"def">>,
|
||||
type => <<"tcp">>,
|
||||
bind => <<"61613">>
|
||||
bind => <<"127.0.0.1:61613">>
|
||||
}
|
||||
]
|
||||
},
|
||||
|
@ -405,7 +405,7 @@ t_listeners_authn_data_mgmt(_) ->
|
|||
#{
|
||||
name => <<"def">>,
|
||||
type => <<"tcp">>,
|
||||
bind => <<"61613">>
|
||||
bind => <<"127.0.0.1:61613">>
|
||||
}
|
||||
]
|
||||
},
|
||||
|
|
|
@ -218,11 +218,13 @@ fields(listener_type_status) ->
|
|||
fields(listener_id_status) ->
|
||||
fields(listener_id) ++
|
||||
[
|
||||
{type, ?HOCON(?ENUM(listeners_type()), #{desc => "Listener type", required => true})},
|
||||
{name, ?HOCON(string(), #{desc => "Listener name", required => true})},
|
||||
{enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
|
||||
{number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId counter"})},
|
||||
{bind,
|
||||
?HOCON(
|
||||
hoconsc:union([emqx_schema:ip_port(), integer()]),
|
||||
emqx_schema:ip_port(),
|
||||
#{desc => "Listener bind addr", required => true}
|
||||
)},
|
||||
{acceptors, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId acceptors"})},
|
||||
|
@ -231,12 +233,24 @@ fields(listener_id_status) ->
|
|||
];
|
||||
fields(status) ->
|
||||
[
|
||||
{running,
|
||||
?HOCON(
|
||||
hoconsc:union([inconsistent, boolean()]),
|
||||
#{desc => "Listener running status", required => true}
|
||||
)},
|
||||
{max_connections,
|
||||
?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})},
|
||||
{current_connections, ?HOCON(non_neg_integer(), #{desc => "Current connections"})}
|
||||
];
|
||||
fields(node_status) ->
|
||||
fields(node) ++ fields(status);
|
||||
[
|
||||
{"node",
|
||||
?HOCON(atom(), #{
|
||||
desc => "Node name",
|
||||
example => "emqx@127.0.0.1"
|
||||
})},
|
||||
{status, ?HOCON(?R_REF(status))}
|
||||
];
|
||||
fields(Type) ->
|
||||
Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}),
|
||||
[Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
|
||||
|
@ -311,7 +325,7 @@ listener_type_status(get, _Request) ->
|
|||
Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})),
|
||||
List = lists:map(
|
||||
fun({Type, L}) ->
|
||||
L1 = maps:without([bind, acceptors], L),
|
||||
L1 = maps:without([bind, acceptors, name], L),
|
||||
L1#{type => Type}
|
||||
end,
|
||||
Listeners
|
||||
|
@ -453,7 +467,7 @@ listener_status_by_id(NodeL) ->
|
|||
fun({Id, L}) ->
|
||||
L1 = maps:remove(ids, L),
|
||||
#{node_status := Nodes} = L1,
|
||||
L1#{number => maps:size(Nodes), id => Id}
|
||||
L1#{number => length(Nodes), id => Id}
|
||||
end,
|
||||
Listeners
|
||||
).
|
||||
|
@ -510,67 +524,75 @@ wrap_rpc(Res) ->
|
|||
format_status(Key, Node, Listener, Acc) ->
|
||||
#{
|
||||
<<"id">> := Id,
|
||||
<<"type">> := Type,
|
||||
<<"enabled">> := Enabled,
|
||||
<<"running">> := Running,
|
||||
<<"max_connections">> := MaxConnections,
|
||||
<<"current_connections">> := CurrentConnections,
|
||||
<<"acceptors">> := Acceptors,
|
||||
<<"bind">> := Bind
|
||||
} = Listener,
|
||||
{ok, #{name := Name}} = emqx_listeners:parse_listener_id(Id),
|
||||
GroupKey = maps:get(Key, Listener),
|
||||
case maps:find(GroupKey, Acc) of
|
||||
error ->
|
||||
Acc#{
|
||||
GroupKey => #{
|
||||
enable => Running,
|
||||
name => Name,
|
||||
type => Type,
|
||||
enable => Enabled,
|
||||
ids => [Id],
|
||||
acceptors => Acceptors,
|
||||
bind => Bind,
|
||||
bind => iolist_to_binary(emqx_listeners:format_bind(Bind)),
|
||||
status => #{
|
||||
running => Running,
|
||||
max_connections => MaxConnections,
|
||||
current_connections => CurrentConnections
|
||||
},
|
||||
node_status => #{
|
||||
Node => #{
|
||||
node_status => [
|
||||
#{
|
||||
node => Node,
|
||||
status => #{
|
||||
running => Running,
|
||||
max_connections => MaxConnections,
|
||||
current_connections => CurrentConnections
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
};
|
||||
{ok, GroupValue} ->
|
||||
#{
|
||||
ids := Ids,
|
||||
status := #{
|
||||
running := Running0,
|
||||
max_connections := MaxConnections0,
|
||||
current_connections := CurrentConnections0
|
||||
},
|
||||
node_status := NodeStatus0
|
||||
} = GroupValue,
|
||||
NodeStatus =
|
||||
case maps:find(Node, NodeStatus0) of
|
||||
error ->
|
||||
NodeStatus0#{
|
||||
Node => #{
|
||||
NodeStatus = [
|
||||
#{
|
||||
node => Node,
|
||||
status => #{
|
||||
running => Running,
|
||||
max_connections => MaxConnections,
|
||||
current_connections => CurrentConnections
|
||||
}
|
||||
};
|
||||
{ok, #{
|
||||
max_connections := PrevMax,
|
||||
current_connections := PrevCurr
|
||||
}} ->
|
||||
NodeStatus0#{
|
||||
Node => #{
|
||||
max_connections => max_conn(MaxConnections, PrevMax),
|
||||
current_connections => CurrentConnections + PrevCurr
|
||||
}
|
||||
}
|
||||
| NodeStatus0
|
||||
],
|
||||
NRunning =
|
||||
case Running == Running0 of
|
||||
true -> Running0;
|
||||
_ -> inconsistent
|
||||
end,
|
||||
Acc#{
|
||||
GroupKey =>
|
||||
GroupValue#{
|
||||
ids => lists:usort([Id | Ids]),
|
||||
status => #{
|
||||
running => NRunning,
|
||||
max_connections => max_conn(MaxConnections0, MaxConnections),
|
||||
current_connections => CurrentConnections0 + CurrentConnections
|
||||
},
|
||||
|
@ -605,17 +627,27 @@ listener_type_status_example() ->
|
|||
#{
|
||||
enable => false,
|
||||
ids => ["tcp:demo"],
|
||||
node_status => #{
|
||||
'emqx@127.0.0.1' => #{
|
||||
node_status =>
|
||||
[
|
||||
#{
|
||||
node => 'emqx@127.0.0.1',
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 11,
|
||||
max_connections => 1024000
|
||||
},
|
||||
'emqx@127.0.0.2' => #{
|
||||
current_connections => 10,
|
||||
max_connections => 1024000
|
||||
}
|
||||
},
|
||||
#{
|
||||
node => 'emqx@127.0.0.1',
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 10,
|
||||
max_connections => 1024000
|
||||
}
|
||||
}
|
||||
],
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 21,
|
||||
max_connections => 2048000
|
||||
},
|
||||
|
@ -624,17 +656,28 @@ listener_type_status_example() ->
|
|||
#{
|
||||
enable => false,
|
||||
ids => ["ssl:default"],
|
||||
node_status => #{
|
||||
'emqx@127.0.0.1' => #{
|
||||
node_status =>
|
||||
[
|
||||
#{
|
||||
node => 'emqx@127.0.0.1',
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 31,
|
||||
max_connections => infinity
|
||||
},
|
||||
'emqx@127.0.0.2' => #{
|
||||
current_connections => 40,
|
||||
max_connections => infinity
|
||||
}
|
||||
},
|
||||
#{
|
||||
node => 'emqx@127.0.0.1',
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 40,
|
||||
max_connections => infinity
|
||||
}
|
||||
}
|
||||
],
|
||||
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 71,
|
||||
max_connections => infinity
|
||||
},
|
||||
|
@ -649,18 +692,30 @@ listener_id_status_example() ->
|
|||
bind => <<"0.0.0.0:1884">>,
|
||||
enable => true,
|
||||
id => <<"tcp:demo">>,
|
||||
node_status => #{
|
||||
'emqx@127.0.0.1' => #{
|
||||
type => <<"tcp">>,
|
||||
name => <<"demo">>,
|
||||
node_status =>
|
||||
[
|
||||
#{
|
||||
node => 'emqx@127.0.0.1',
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 100,
|
||||
max_connections => 1024000
|
||||
},
|
||||
'emqx@127.0.0.2' => #{
|
||||
current_connections => 101,
|
||||
max_connections => 1024000
|
||||
}
|
||||
},
|
||||
#{
|
||||
node => 'emqx@127.0.0.1',
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 101,
|
||||
max_connections => 1024000
|
||||
}
|
||||
}
|
||||
],
|
||||
number => 2,
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 201,
|
||||
max_connections => 2048000
|
||||
}
|
||||
|
@ -670,18 +725,30 @@ listener_id_status_example() ->
|
|||
bind => <<"0.0.0.0:1883">>,
|
||||
enable => true,
|
||||
id => <<"tcp:default">>,
|
||||
node_status => #{
|
||||
'emqx@127.0.0.1' => #{
|
||||
current_connections => 300,
|
||||
max_connections => infinity
|
||||
},
|
||||
'emqx@127.0.0.2' => #{
|
||||
current_connections => 201,
|
||||
type => <<"tcp">>,
|
||||
name => <<"default">>,
|
||||
node_status =>
|
||||
[
|
||||
#{
|
||||
node => 'emqx@127.0.0.1',
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 200,
|
||||
max_connections => infinity
|
||||
}
|
||||
},
|
||||
#{
|
||||
node => 'emqx@127.0.0.1',
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 301,
|
||||
max_connections => infinity
|
||||
}
|
||||
}
|
||||
],
|
||||
number => 2,
|
||||
status => #{
|
||||
running => true,
|
||||
current_connections => 501,
|
||||
max_connections => infinity
|
||||
}
|
||||
|
|
|
@ -582,7 +582,7 @@ listeners([]) ->
|
|||
end,
|
||||
Info =
|
||||
[
|
||||
{listen_on, {string, format_listen_on(Bind)}},
|
||||
{listen_on, {string, emqx_listeners:format_bind(Bind)}},
|
||||
{acceptors, Acceptors},
|
||||
{proxy_protocol, ProxyProtocol},
|
||||
{running, Running}
|
||||
|
@ -802,15 +802,6 @@ indent_print({Key, {string, Val}}) ->
|
|||
indent_print({Key, Val}) ->
|
||||
emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]).
|
||||
|
||||
format_listen_on(Port) when is_integer(Port) ->
|
||||
io_lib:format("0.0.0.0:~w", [Port]);
|
||||
format_listen_on({Addr, Port}) when is_list(Addr) ->
|
||||
io_lib:format("~ts:~w", [Addr, Port]);
|
||||
format_listen_on({Addr, Port}) when is_tuple(Addr) andalso tuple_size(Addr) == 4 ->
|
||||
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]);
|
||||
format_listen_on({Addr, Port}) when is_tuple(Addr) andalso tuple_size(Addr) == 8 ->
|
||||
io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]).
|
||||
|
||||
name(Filter) ->
|
||||
iolist_to_binary(["CLI-", Filter]).
|
||||
|
||||
|
|
2
bin/emqx
2
bin/emqx
|
@ -416,7 +416,7 @@ call_hocon() {
|
|||
## and parsing HOCON config + environment variables is a non-trivial task
|
||||
CONF_KEYS=( 'node.data_dir' 'node.name' 'node.cookie' 'node.db_backend' 'cluster.proto_dist' )
|
||||
if [ "$IS_ENTERPRISE" = 'yes' ]; then
|
||||
CONF_KEYS+=( 'license.file' 'license.key' )
|
||||
CONF_KEYS+=( 'license.type' 'license.file' 'license.key' )
|
||||
fi
|
||||
|
||||
if [ "$IS_BOOT_COMMAND" = 'yes' ]; then
|
||||
|
|
|
@ -25,9 +25,9 @@ main(Args) ->
|
|||
%% forward the call to hocon_cli
|
||||
hocon_cli:main(Rest);
|
||||
["check_license_key", Key] ->
|
||||
check_license(#{key => list_to_binary(Key)});
|
||||
check_license(#{type => key, key => list_to_binary(Key)});
|
||||
["check_license_file", File] ->
|
||||
check_license(#{file => list_to_binary(File)});
|
||||
check_license(#{type => file, file => list_to_binary(File)});
|
||||
_ ->
|
||||
do(Args)
|
||||
end.
|
||||
|
|
1
build
1
build
|
@ -57,6 +57,7 @@ if [ "${SYSTEM}" = 'windows' ]; then
|
|||
# windows does not like the find
|
||||
FIND="/usr/bin/find"
|
||||
TAR="/usr/bin/tar"
|
||||
export BUILD_WITHOUT_ROCKSDB="on"
|
||||
else
|
||||
FIND='find'
|
||||
TAR='tar'
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
license {
|
||||
type = key
|
||||
# The default license has 1000 connections limit, it is issued on 20220419 and valid for 5 years (1825 days)
|
||||
key = "MjIwMTExCjAKMTAKRXZhbHVhdGlvbgpjb250YWN0QGVtcXguaW8KZGVmYXVsdAoyMDIyMDQxOQoxODI1CjEwMDAK.MEQCICbgRVijCQov2hrvZXR1mk9Oa+tyV1F5oJ6iOZeSHjnQAiB9dUiVeaZekDOjztk+NCWjhk4PG8tWfw2uFZWruSzD6g=="
|
||||
connection_low_watermark = 75%,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_license, [
|
||||
{description, "EMQX License"},
|
||||
{vsn, "5.0.0"},
|
||||
{vsn, "5.0.1"},
|
||||
{modules, []},
|
||||
{registered, [emqx_license_sup]},
|
||||
{applications, [kernel, stdlib]},
|
||||
|
|
|
@ -22,7 +22,9 @@
|
|||
read_license/0,
|
||||
read_license/1,
|
||||
update_file/1,
|
||||
update_key/1
|
||||
update_key/1,
|
||||
license_dir/0,
|
||||
save_and_backup_license/1
|
||||
]).
|
||||
|
||||
-define(CONF_KEY_PATH, [license]).
|
||||
|
@ -54,15 +56,29 @@ unload() ->
|
|||
emqx_conf:remove_handler(?CONF_KEY_PATH),
|
||||
emqx_license_cli:unload().
|
||||
|
||||
-spec license_dir() -> file:filename().
|
||||
license_dir() ->
|
||||
filename:join([emqx:data_dir(), licenses]).
|
||||
|
||||
%% Subdirectory relative to data dir.
|
||||
-spec relative_license_path() -> file:filename().
|
||||
relative_license_path() ->
|
||||
filename:join([licenses, "emqx.lic"]).
|
||||
|
||||
-spec update_file(binary() | string()) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
update_file(Filename) when is_binary(Filename); is_list(Filename) ->
|
||||
case file:read_file(Filename) of
|
||||
{ok, Contents} ->
|
||||
Result = emqx_conf:update(
|
||||
?CONF_KEY_PATH,
|
||||
{file, Filename},
|
||||
{file, Contents},
|
||||
#{rawconf_with_defaults => true, override_to => local}
|
||||
),
|
||||
handle_config_update_result(Result).
|
||||
handle_config_update_result(Result);
|
||||
{error, Error} ->
|
||||
{error, Error}
|
||||
end.
|
||||
|
||||
-spec update_key(binary() | string()) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
|
@ -125,22 +141,18 @@ del_license_hook() ->
|
|||
_ = emqx_hooks:del('client.connect', {?MODULE, check, []}),
|
||||
ok.
|
||||
|
||||
do_update({file, Filename}, Conf) ->
|
||||
case file:read_file(Filename) of
|
||||
{ok, Content} ->
|
||||
case emqx_license_parser:parse(Content) of
|
||||
{ok, _License} ->
|
||||
maps:remove(<<"key">>, Conf#{<<"file">> => Filename});
|
||||
{error, Reason} ->
|
||||
erlang:throw(Reason)
|
||||
end;
|
||||
{error, Reason} ->
|
||||
erlang:throw({invalid_license_file, Reason})
|
||||
end;
|
||||
do_update({file, NewContents}, Conf) ->
|
||||
Res = emqx_license_proto_v2:save_and_backup_license(mria_mnesia:running_nodes(), NewContents),
|
||||
%% assert
|
||||
true = lists:all(fun(X) -> X =:= {ok, ok} end, Res),
|
||||
%% Must be relative to the data dir, since different nodes might
|
||||
%% have different data directories configured...
|
||||
LicensePath = relative_license_path(),
|
||||
maps:remove(<<"key">>, Conf#{<<"type">> => file, <<"file">> => LicensePath});
|
||||
do_update({key, Content}, Conf) when is_binary(Content); is_list(Content) ->
|
||||
case emqx_license_parser:parse(Content) of
|
||||
{ok, _License} ->
|
||||
maps:remove(<<"file">>, Conf#{<<"key">> => Content});
|
||||
maps:remove(<<"file">>, Conf#{<<"type">> => key, <<"key">> => Content});
|
||||
{error, Reason} ->
|
||||
erlang:throw(Reason)
|
||||
end;
|
||||
|
@ -148,17 +160,61 @@ do_update({key, Content}, Conf) when is_binary(Content); is_list(Content) ->
|
|||
do_update(_Other, Conf) ->
|
||||
Conf.
|
||||
|
||||
save_and_backup_license(NewLicenseKey) ->
|
||||
%% Must be relative to the data dir, since different nodes might
|
||||
%% have different data directories configured...
|
||||
CurrentLicensePath = filename:join(emqx:data_dir(), relative_license_path()),
|
||||
LicenseDir = filename:dirname(CurrentLicensePath),
|
||||
case filelib:ensure_dir(CurrentLicensePath) of
|
||||
ok -> ok;
|
||||
{error, EnsureError} -> throw({error_creating_license_dir, EnsureError})
|
||||
end,
|
||||
case file:read_file(CurrentLicensePath) of
|
||||
{ok, NewLicenseKey} ->
|
||||
%% same contents; nothing to do.
|
||||
ok;
|
||||
{ok, _OldContents} ->
|
||||
Time = calendar:system_time_to_rfc3339(erlang:system_time(second)),
|
||||
BackupPath = filename:join([
|
||||
LicenseDir,
|
||||
"emqx.lic." ++ Time ++ ".backup"
|
||||
]),
|
||||
case file:copy(CurrentLicensePath, BackupPath) of
|
||||
{ok, _} -> ok;
|
||||
{error, CopyError} -> throw({error_backing_up_license, CopyError})
|
||||
end,
|
||||
ok;
|
||||
{error, enoent} ->
|
||||
ok;
|
||||
{error, Error} ->
|
||||
throw({error_reading_existing_license, Error})
|
||||
end,
|
||||
case file:write_file(CurrentLicensePath, NewLicenseKey) of
|
||||
ok -> ok;
|
||||
{error, WriteError} -> throw({error_writing_license, WriteError})
|
||||
end,
|
||||
ok.
|
||||
|
||||
check_max_clients_exceeded(MaxClients) ->
|
||||
emqx_license_resources:connection_count() > MaxClients * 1.1.
|
||||
|
||||
read_license(#{file := Filename}) ->
|
||||
read_license(#{type := file, file := Filename}) ->
|
||||
case file:read_file(Filename) of
|
||||
{ok, Content} ->
|
||||
emqx_license_parser:parse(Content);
|
||||
{error, _} = Error ->
|
||||
%% Could be a relative path in data folder after update.
|
||||
FilenameDataDir = filename:join(emqx:data_dir(), Filename),
|
||||
case file:read_file(FilenameDataDir) of
|
||||
{ok, Content} -> emqx_license_parser:parse(Content);
|
||||
{error, _} = Error -> Error
|
||||
_Error -> Error
|
||||
end
|
||||
end;
|
||||
read_license(#{key := Content}) ->
|
||||
read_license(#{type := key, key := Content}) ->
|
||||
emqx_license_parser:parse(Content).
|
||||
|
||||
handle_config_update_result({error, {post_config_update, ?MODULE, Error}}) ->
|
||||
{error, Error};
|
||||
handle_config_update_result({error, _} = Error) ->
|
||||
Error;
|
||||
handle_config_update_result({ok, #{post_config_update := #{emqx_license := Result}}}) ->
|
||||
|
|
|
@ -128,6 +128,6 @@ ensure_timer(#{check_peer_interval := CheckInterval} = State) ->
|
|||
|
||||
remote_connection_count() ->
|
||||
Nodes = mria_mnesia:running_nodes() -- [node()],
|
||||
Results = emqx_license_proto_v1:remote_connection_counts(Nodes),
|
||||
Results = emqx_license_proto_v2:remote_connection_counts(Nodes),
|
||||
Counts = [Count || {ok, Count} <- Results],
|
||||
lists:sum(Counts).
|
||||
|
|
|
@ -14,14 +14,15 @@
|
|||
|
||||
-export([roots/0, fields/1, validations/0, desc/1]).
|
||||
|
||||
-export([
|
||||
license_type/0
|
||||
]).
|
||||
|
||||
roots() ->
|
||||
[
|
||||
{license,
|
||||
hoconsc:mk(
|
||||
hoconsc:union([
|
||||
hoconsc:ref(?MODULE, key_license),
|
||||
hoconsc:ref(?MODULE, file_license)
|
||||
]),
|
||||
license_type(),
|
||||
#{
|
||||
desc =>
|
||||
"EMQX Enterprise license.\n"
|
||||
|
@ -36,16 +37,35 @@ roots() ->
|
|||
|
||||
fields(key_license) ->
|
||||
[
|
||||
{type, #{
|
||||
type => key,
|
||||
required => true
|
||||
}},
|
||||
{key, #{
|
||||
type => string(),
|
||||
%% so it's not logged
|
||||
sensitive => true,
|
||||
required => true,
|
||||
desc => "License string"
|
||||
}},
|
||||
{file, #{
|
||||
type => string(),
|
||||
required => false
|
||||
}}
|
||||
| common_fields()
|
||||
];
|
||||
fields(file_license) ->
|
||||
[
|
||||
{type, #{
|
||||
type => file,
|
||||
required => true
|
||||
}},
|
||||
{key, #{
|
||||
type => string(),
|
||||
%% so it's not logged
|
||||
sensitive => true,
|
||||
required => false
|
||||
}},
|
||||
{file, #{
|
||||
type => string(),
|
||||
desc => "Path to the license file"
|
||||
|
@ -77,6 +97,12 @@ common_fields() ->
|
|||
validations() ->
|
||||
[{check_license_watermark, fun check_license_watermark/1}].
|
||||
|
||||
license_type() ->
|
||||
hoconsc:union([
|
||||
hoconsc:ref(?MODULE, key_license),
|
||||
hoconsc:ref(?MODULE, file_license)
|
||||
]).
|
||||
|
||||
check_license_watermark(Conf) ->
|
||||
case hocon_maps:get("license.connection_low_watermark", Conf) of
|
||||
undefined ->
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_license_proto_v2).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
-export([introduced_in/0]).
|
||||
|
||||
-export([
|
||||
remote_connection_counts/1,
|
||||
save_and_backup_license/2
|
||||
]).
|
||||
|
||||
-define(TIMEOUT, 500).
|
||||
-define(BACKUP_TIMEOUT, 15_000).
|
||||
|
||||
introduced_in() ->
|
||||
"5.0.5".
|
||||
|
||||
-spec remote_connection_counts(list(node())) -> list({atom(), term()}).
|
||||
remote_connection_counts(Nodes) ->
|
||||
erpc:multicall(Nodes, emqx_license_resources, local_connection_count, [], ?TIMEOUT).
|
||||
|
||||
-spec save_and_backup_license(list(node()), binary()) -> list({atom(), term()}).
|
||||
save_and_backup_license(Nodes, NewLicenseKey) ->
|
||||
erpc:multicall(Nodes, emqx_license, save_and_backup_license, [NewLicenseKey], ?BACKUP_TIMEOUT).
|
|
@ -28,39 +28,190 @@ end_per_suite(_) ->
|
|||
init_per_testcase(Case, Config) ->
|
||||
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||
set_invalid_license_file(Case),
|
||||
Config.
|
||||
Paths = set_override_paths(Case),
|
||||
Config0 = setup_test(Case, Config),
|
||||
Paths ++ Config0 ++ Config.
|
||||
|
||||
end_per_testcase(Case, _Config) ->
|
||||
end_per_testcase(Case, Config) ->
|
||||
restore_valid_license_file(Case),
|
||||
clean_overrides(Case, Config),
|
||||
teardown_test(Case, Config),
|
||||
ok.
|
||||
|
||||
set_override_paths(TestCase) when
|
||||
TestCase =:= t_change_from_file_to_key;
|
||||
TestCase =:= t_change_from_key_to_file
|
||||
->
|
||||
LocalOverridePath = filename:join([
|
||||
"/tmp",
|
||||
"local-" ++ atom_to_list(TestCase) ++ ".conf"
|
||||
]),
|
||||
ClusterOverridePath = filename:join([
|
||||
"/tmp",
|
||||
"local-" ++ atom_to_list(TestCase) ++ ".conf"
|
||||
]),
|
||||
application:set_env(emqx, local_override_conf_file, LocalOverridePath),
|
||||
application:set_env(emqx, cluster_override_conf_file, ClusterOverridePath),
|
||||
[
|
||||
{local_override_path, LocalOverridePath},
|
||||
{cluster_override_path, ClusterOverridePath}
|
||||
];
|
||||
set_override_paths(_TestCase) ->
|
||||
[].
|
||||
|
||||
clean_overrides(TestCase, Config) when
|
||||
TestCase =:= t_change_from_file_to_key;
|
||||
TestCase =:= t_change_from_key_to_file
|
||||
->
|
||||
LocalOverridePath = ?config(local_override_path, Config),
|
||||
ClusterOverridePath = ?config(cluster_override_path, Config),
|
||||
file:delete(LocalOverridePath),
|
||||
file:delete(ClusterOverridePath),
|
||||
application:unset_env(emqx, local_override_conf_file),
|
||||
application:unset_env(emqx, cluster_override_conf_file),
|
||||
ok;
|
||||
clean_overrides(_TestCase, _Config) ->
|
||||
ok.
|
||||
|
||||
setup_test(TestCase, Config) when
|
||||
TestCase =:= t_update_file_cluster_backup
|
||||
->
|
||||
DataDir = ?config(data_dir, Config),
|
||||
{LicenseKey, _License} = mk_license(
|
||||
[
|
||||
%% license format version
|
||||
"220111",
|
||||
%% license type
|
||||
"0",
|
||||
%% customer type
|
||||
"10",
|
||||
%% customer name
|
||||
"Foo",
|
||||
%% customer email
|
||||
"contact@foo.com",
|
||||
%% deplayment name
|
||||
"bar-deployment",
|
||||
%% start date
|
||||
"20220111",
|
||||
%% days
|
||||
"100000",
|
||||
%% max connections
|
||||
"19"
|
||||
]
|
||||
),
|
||||
Cluster = emqx_common_test_helpers:emqx_cluster(
|
||||
[core, core],
|
||||
[
|
||||
{apps, [emqx_conf, emqx_license]},
|
||||
{load_schema, false},
|
||||
{schema_mod, emqx_enterprise_conf_schema},
|
||||
{env_handler, fun
|
||||
(emqx) ->
|
||||
emqx_config:save_schema_mod_and_names(emqx_enterprise_conf_schema),
|
||||
%% emqx_config:save_schema_mod_and_names(emqx_license_schema),
|
||||
application:set_env(emqx, boot_modules, []),
|
||||
application:set_env(
|
||||
emqx,
|
||||
data_dir,
|
||||
filename:join([
|
||||
DataDir,
|
||||
TestCase,
|
||||
node()
|
||||
])
|
||||
),
|
||||
ok;
|
||||
(emqx_conf) ->
|
||||
emqx_config:save_schema_mod_and_names(emqx_enterprise_conf_schema),
|
||||
%% emqx_config:save_schema_mod_and_names(emqx_license_schema),
|
||||
application:set_env(
|
||||
emqx,
|
||||
data_dir,
|
||||
filename:join([
|
||||
DataDir,
|
||||
TestCase,
|
||||
node()
|
||||
])
|
||||
),
|
||||
ok;
|
||||
(emqx_license) ->
|
||||
LicensePath = filename:join(emqx_license:license_dir(), "emqx.lic"),
|
||||
filelib:ensure_dir(LicensePath),
|
||||
ok = file:write_file(LicensePath, LicenseKey),
|
||||
LicConfig = #{type => file, file => LicensePath},
|
||||
emqx_config:put([license], LicConfig),
|
||||
RawConfig = #{<<"type">> => file, <<"file">> => LicensePath},
|
||||
emqx_config:put_raw([<<"license">>], RawConfig),
|
||||
ok = meck:new(emqx_license, [non_strict, passthrough, no_history, no_link]),
|
||||
%% meck:expect(emqx_license, read_license, fun() -> {ok, License} end),
|
||||
meck:expect(
|
||||
emqx_license_parser,
|
||||
parse,
|
||||
fun(X) ->
|
||||
emqx_license_parser:parse(
|
||||
X,
|
||||
emqx_license_test_lib:public_key_pem()
|
||||
)
|
||||
end
|
||||
),
|
||||
ok;
|
||||
(_) ->
|
||||
ok
|
||||
end}
|
||||
]
|
||||
),
|
||||
Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster],
|
||||
[{nodes, Nodes}, {cluster, Cluster}, {old_license, LicenseKey}];
|
||||
setup_test(_TestCase, _Config) ->
|
||||
[].
|
||||
|
||||
teardown_test(TestCase, Config) when
|
||||
TestCase =:= t_update_file_cluster_backup
|
||||
->
|
||||
Nodes = ?config(nodes, Config),
|
||||
lists:foreach(
|
||||
fun(N) ->
|
||||
LicenseDir = erpc:call(N, emqx_license, license_dir, []),
|
||||
{ok, _} = emqx_common_test_helpers:stop_slave(N),
|
||||
ok = file:del_dir_r(LicenseDir),
|
||||
ok
|
||||
end,
|
||||
Nodes
|
||||
),
|
||||
ok;
|
||||
teardown_test(_TestCase, _Config) ->
|
||||
ok.
|
||||
|
||||
set_invalid_license_file(t_read_license_from_invalid_file) ->
|
||||
Config = #{file => "/invalid/file"},
|
||||
Config = #{type => file, file => "/invalid/file"},
|
||||
emqx_config:put([license], Config);
|
||||
set_invalid_license_file(_) ->
|
||||
ok.
|
||||
|
||||
restore_valid_license_file(t_read_license_from_invalid_file) ->
|
||||
Config = #{file => emqx_license_test_lib:default_license()},
|
||||
Config = #{type => file, file => emqx_license_test_lib:default_license()},
|
||||
emqx_config:put([license], Config);
|
||||
restore_valid_license_file(_) ->
|
||||
ok.
|
||||
|
||||
set_special_configs(emqx_license) ->
|
||||
Config = #{file => emqx_license_test_lib:default_license()},
|
||||
Config = #{type => file, file => emqx_license_test_lib:default_license()},
|
||||
emqx_config:put([license], Config),
|
||||
RawConfig = #{<<"file">> => emqx_license_test_lib:default_license()},
|
||||
RawConfig = #{<<"type">> => file, <<"file">> => emqx_license_test_lib:default_license()},
|
||||
emqx_config:put_raw([<<"license">>], RawConfig);
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
||||
assert_on_nodes(Nodes, RunFun, CheckFun) ->
|
||||
Res = [{N, erpc:call(N, RunFun)} || N <- Nodes],
|
||||
lists:foreach(CheckFun, Res).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Tests
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
t_update_file(_Config) ->
|
||||
?assertMatch(
|
||||
{error, {invalid_license_file, enoent}},
|
||||
{error, enoent},
|
||||
emqx_license:update_file("/unknown/path")
|
||||
),
|
||||
|
||||
|
@ -75,6 +226,115 @@ t_update_file(_Config) ->
|
|||
emqx_license:update_file(emqx_license_test_lib:default_license())
|
||||
).
|
||||
|
||||
t_update_file_cluster_backup(Config) ->
|
||||
OldLicenseKey = ?config(old_license, Config),
|
||||
Nodes = [N1 | _] = ?config(nodes, Config),
|
||||
|
||||
%% update the license file for the cluster
|
||||
{NewLicenseKey, NewDecodedLicense} = mk_license(
|
||||
[
|
||||
%% license format version
|
||||
"220111",
|
||||
%% license type
|
||||
"0",
|
||||
%% customer type
|
||||
"10",
|
||||
%% customer name
|
||||
"Foo",
|
||||
%% customer email
|
||||
"contact@foo.com",
|
||||
%% deplayment name
|
||||
"bar-deployment",
|
||||
%% start date
|
||||
"20220111",
|
||||
%% days
|
||||
"100000",
|
||||
%% max connections
|
||||
"190"
|
||||
]
|
||||
),
|
||||
NewLicensePath = "tmp_new_license.lic",
|
||||
ok = file:write_file(NewLicensePath, NewLicenseKey),
|
||||
{ok, _} = erpc:call(N1, emqx_license, update_file, [NewLicensePath]),
|
||||
|
||||
assert_on_nodes(
|
||||
Nodes,
|
||||
fun() ->
|
||||
Conf = emqx_conf:get([license]),
|
||||
emqx_license:read_license(Conf)
|
||||
end,
|
||||
fun({N, Res}) ->
|
||||
?assertMatch({ok, _}, Res, #{node => N}),
|
||||
{ok, License} = Res,
|
||||
?assertEqual(NewDecodedLicense, License, #{node => N})
|
||||
end
|
||||
),
|
||||
|
||||
assert_on_nodes(
|
||||
Nodes,
|
||||
fun() ->
|
||||
LicenseDir = emqx_license:license_dir(),
|
||||
file:list_dir(LicenseDir)
|
||||
end,
|
||||
fun({N, Res}) ->
|
||||
?assertMatch({ok, _}, Res, #{node => N}),
|
||||
{ok, DirContents} = Res,
|
||||
%% the now current license
|
||||
?assert(lists:member("emqx.lic", DirContents), #{node => N, dir_contents => DirContents}),
|
||||
%% the backed up old license
|
||||
?assert(
|
||||
lists:any(
|
||||
fun
|
||||
("emqx.lic." ++ Suffix) -> lists:suffix(".backup", Suffix);
|
||||
(_) -> false
|
||||
end,
|
||||
DirContents
|
||||
),
|
||||
#{node => N, dir_contents => DirContents}
|
||||
)
|
||||
end
|
||||
),
|
||||
|
||||
assert_on_nodes(
|
||||
Nodes,
|
||||
fun() ->
|
||||
LicenseDir = emqx_license:license_dir(),
|
||||
{ok, DirContents} = file:list_dir(LicenseDir),
|
||||
[BackupLicensePath0] = [
|
||||
F
|
||||
|| "emqx.lic." ++ F <- DirContents, lists:suffix(".backup", F)
|
||||
],
|
||||
BackupLicensePath = "emqx.lic." ++ BackupLicensePath0,
|
||||
{ok, BackupLicense} = file:read_file(filename:join(LicenseDir, BackupLicensePath)),
|
||||
{ok, NewLicense} = file:read_file(filename:join(LicenseDir, "emqx.lic")),
|
||||
#{
|
||||
backup => BackupLicense,
|
||||
new => NewLicense
|
||||
}
|
||||
end,
|
||||
fun({N, #{backup := BackupLicense, new := NewLicense}}) ->
|
||||
?assertEqual(OldLicenseKey, BackupLicense, #{node => N}),
|
||||
?assertEqual(NewLicenseKey, NewLicense, #{node => N})
|
||||
end
|
||||
),
|
||||
|
||||
%% uploading the same license twice should not generate extra backups.
|
||||
{ok, _} = erpc:call(N1, emqx_license, update_file, [NewLicensePath]),
|
||||
|
||||
assert_on_nodes(
|
||||
Nodes,
|
||||
fun() ->
|
||||
LicenseDir = emqx_license:license_dir(),
|
||||
{ok, DirContents} = file:list_dir(LicenseDir),
|
||||
[F || "emqx.lic." ++ F <- DirContents, lists:suffix(".backup", F)]
|
||||
end,
|
||||
fun({N, Backups}) ->
|
||||
?assertMatch([_], Backups, #{node => N})
|
||||
end
|
||||
),
|
||||
|
||||
ok.
|
||||
|
||||
t_update_value(_Config) ->
|
||||
?assertMatch(
|
||||
{error, [_ | _]},
|
||||
|
@ -95,7 +355,7 @@ t_read_license_from_invalid_file(_Config) ->
|
|||
).
|
||||
|
||||
t_check_exceeded(_Config) ->
|
||||
License = mk_license(
|
||||
{_, License} = mk_license(
|
||||
[
|
||||
"220111",
|
||||
"0",
|
||||
|
@ -124,7 +384,7 @@ t_check_exceeded(_Config) ->
|
|||
).
|
||||
|
||||
t_check_ok(_Config) ->
|
||||
License = mk_license(
|
||||
{_, License} = mk_license(
|
||||
[
|
||||
"220111",
|
||||
"0",
|
||||
|
@ -153,7 +413,7 @@ t_check_ok(_Config) ->
|
|||
).
|
||||
|
||||
t_check_expired(_Config) ->
|
||||
License = mk_license(
|
||||
{_, License} = mk_license(
|
||||
[
|
||||
"220111",
|
||||
%% Official customer
|
||||
|
@ -183,6 +443,39 @@ t_check_not_loaded(_Config) ->
|
|||
emqx_license:check(#{}, #{})
|
||||
).
|
||||
|
||||
t_change_from_file_to_key(_Config) ->
|
||||
%% precondition
|
||||
?assertMatch(#{file := _}, emqx_conf:get([license])),
|
||||
|
||||
OldConf = emqx_conf:get_raw([]),
|
||||
|
||||
%% this saves updated config to `{cluster,local}-overrrides.conf'
|
||||
{ok, LicenseValue} = file:read_file(emqx_license_test_lib:default_license()),
|
||||
{ok, _NewConf} = emqx_license:update_key(LicenseValue),
|
||||
|
||||
%% assert that `{cluster,local}-overrides.conf' merge correctly
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_license_schema, OldConf, #{})),
|
||||
|
||||
ok.
|
||||
|
||||
t_change_from_key_to_file(_Config) ->
|
||||
Config = #{type => key, key => <<"some key">>},
|
||||
emqx_config:put([license], Config),
|
||||
RawConfig = #{<<"type">> => key, <<"key">> => <<"some key">>},
|
||||
emqx_config:put_raw([<<"license">>], RawConfig),
|
||||
|
||||
%% precondition
|
||||
?assertMatch(#{type := key, key := _}, emqx_conf:get([license])),
|
||||
OldConf = emqx_conf:get_raw([]),
|
||||
|
||||
%% this saves updated config to `{cluster,local}-overrrides.conf'
|
||||
{ok, _NewConf} = emqx_license:update_file(emqx_license_test_lib:default_license()),
|
||||
|
||||
%% assert that `{cluster,local}-overrides.conf' merge correctly
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_license_schema, OldConf, #{})),
|
||||
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -193,4 +486,4 @@ mk_license(Fields) ->
|
|||
EncodedLicense,
|
||||
emqx_license_test_lib:public_key_pem()
|
||||
),
|
||||
License.
|
||||
{EncodedLicense, License}.
|
||||
|
|
|
@ -35,7 +35,7 @@ end_per_testcase(_Case, _Config) ->
|
|||
ok.
|
||||
|
||||
set_special_configs(emqx_license) ->
|
||||
Config = #{file => emqx_license_test_lib:default_license()},
|
||||
Config = #{type => file, file => emqx_license_test_lib:default_license()},
|
||||
emqx_config:put([license], Config);
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
|
|
@ -31,9 +31,9 @@ end_per_testcase(_Case, _Config) ->
|
|||
ok.
|
||||
|
||||
set_special_configs(emqx_license) ->
|
||||
Config = #{file => emqx_license_test_lib:default_license()},
|
||||
Config = #{type => file, file => emqx_license_test_lib:default_license()},
|
||||
emqx_config:put([license], Config),
|
||||
RawConfig = #{<<"file">> => emqx_license_test_lib:default_license()},
|
||||
RawConfig = #{<<"type">> => file, <<"file">> => emqx_license_test_lib:default_license()},
|
||||
emqx_config:put_raw([<<"license">>], RawConfig);
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
|
|
@ -31,7 +31,7 @@ end_per_testcase(_Case, _Config) ->
|
|||
ok.
|
||||
|
||||
set_special_configs(emqx_license) ->
|
||||
Config = #{file => emqx_license_test_lib:default_license()},
|
||||
Config = #{type => file, file => emqx_license_test_lib:default_license()},
|
||||
emqx_config:put([license], Config);
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
|
|
@ -30,7 +30,7 @@ end_per_testcase(_Case, _Config) ->
|
|||
ok.
|
||||
|
||||
set_special_configs(emqx_license) ->
|
||||
Config = #{file => emqx_license_test_lib:default_license()},
|
||||
Config = #{type => file, file => emqx_license_test_lib:default_license()},
|
||||
emqx_config:put([license], Config);
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
|
|
@ -30,7 +30,7 @@ end_per_testcase(_Case, _Config) ->
|
|||
ok.
|
||||
|
||||
set_special_configs(emqx_license) ->
|
||||
Config = #{file => emqx_license_test_lib:default_license()},
|
||||
Config = #{type => file, file => emqx_license_test_lib:default_license()},
|
||||
emqx_config:put([license], Config);
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
|
|
@ -31,7 +31,7 @@ end_per_testcase(_Case, _Config) ->
|
|||
ok.
|
||||
|
||||
set_special_configs(emqx_license) ->
|
||||
Config = #{file => emqx_license_test_lib:default_license()},
|
||||
Config = #{type => file, file => emqx_license_test_lib:default_license()},
|
||||
emqx_config:put([license], Config);
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
@ -59,9 +59,9 @@ t_connection_count(_Config) ->
|
|||
meck:new(emqx_cm, [passthrough]),
|
||||
meck:expect(emqx_cm, get_connected_client_count, fun() -> 10 end),
|
||||
|
||||
meck:new(emqx_license_proto_v1, [passthrough]),
|
||||
meck:new(emqx_license_proto_v2, [passthrough]),
|
||||
meck:expect(
|
||||
emqx_license_proto_v1,
|
||||
emqx_license_proto_v2,
|
||||
remote_connection_counts,
|
||||
fun(_Nodes) ->
|
||||
[{ok, 5}, {error, some_error}]
|
||||
|
@ -82,8 +82,8 @@ t_connection_count(_Config) ->
|
|||
end
|
||||
),
|
||||
|
||||
meck:unload(emqx_license_proto_v1),
|
||||
meck:unload(emqx_license_proto_v2),
|
||||
meck:unload(emqx_cm).
|
||||
|
||||
t_emqx_license_proto(_Config) ->
|
||||
?assert("5.0.0" =< emqx_license_proto_v1:introduced_in()).
|
||||
?assert("5.0.0" =< emqx_license_proto_v2:introduced_in()).
|
||||
|
|
33
mix.exs
33
mix.exs
|
@ -30,29 +30,29 @@ defmodule EMQXUmbrella.MixProject do
|
|||
"""
|
||||
|
||||
def project() do
|
||||
check_profile!()
|
||||
profile_info = check_profile!()
|
||||
|
||||
[
|
||||
app: :emqx_mix,
|
||||
version: pkg_vsn(),
|
||||
deps: deps(),
|
||||
deps: deps(profile_info),
|
||||
releases: releases()
|
||||
]
|
||||
end
|
||||
|
||||
defp deps() do
|
||||
defp deps(profile_info) do
|
||||
# we need several overrides here because dependencies specify
|
||||
# other exact versions, and not ranges.
|
||||
[
|
||||
{:lc, github: "emqx/lc", tag: "0.3.1"},
|
||||
{:redbug, "2.0.7"},
|
||||
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
|
||||
{:ehttpc, github: "emqx/ehttpc", tag: "0.2.1"},
|
||||
{:ehttpc, github: "emqx/ehttpc", tag: "0.3.0"},
|
||||
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
|
||||
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
||||
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
||||
{:esockd, github: "emqx/esockd", tag: "5.9.3", override: true},
|
||||
{:ekka, github: "emqx/ekka", tag: "0.13.2", override: true},
|
||||
{:ekka, github: "emqx/ekka", tag: "0.13.3", override: true},
|
||||
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
|
||||
{:grpc, github: "emqx/grpc-erl", tag: "0.6.6", override: true},
|
||||
{:minirest, github: "emqx/minirest", tag: "1.3.5", override: true},
|
||||
|
@ -90,7 +90,8 @@ defmodule EMQXUmbrella.MixProject do
|
|||
# in conflict by grpc and eetcd
|
||||
{:gpb, "4.11.2", override: true, runtime: false},
|
||||
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}
|
||||
] ++ umbrella_apps() ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep()
|
||||
] ++
|
||||
umbrella_apps() ++ enterprise_apps(profile_info) ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep()
|
||||
end
|
||||
|
||||
defp umbrella_apps() do
|
||||
|
@ -99,13 +100,31 @@ defmodule EMQXUmbrella.MixProject do
|
|||
|> Enum.map(fn path ->
|
||||
app =
|
||||
path
|
||||
|> String.trim_leading("apps/")
|
||||
|> Path.basename()
|
||||
|> String.to_atom()
|
||||
|
||||
{app, path: path, manager: :rebar3, override: true}
|
||||
end)
|
||||
end
|
||||
|
||||
defp enterprise_apps(_profile_info = %{edition_type: :enterprise}) do
|
||||
"lib-ee/*"
|
||||
|> Path.wildcard()
|
||||
|> Enum.filter(&File.dir?/1)
|
||||
|> Enum.map(fn path ->
|
||||
app =
|
||||
path
|
||||
|> Path.basename()
|
||||
|> String.to_atom()
|
||||
|
||||
{app, path: path, manager: :rebar3, override: true}
|
||||
end)
|
||||
end
|
||||
|
||||
defp enterprise_apps(_profile_info) do
|
||||
[]
|
||||
end
|
||||
|
||||
defp releases() do
|
||||
[
|
||||
emqx: fn ->
|
||||
|
|
|
@ -49,12 +49,12 @@
|
|||
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
||||
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
|
||||
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
|
||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.1"}}}
|
||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.3.0"}}}
|
||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.3"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.2"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.3"}}}
|
||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
|
||||
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.5"}}}
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
?SH-PROMPT
|
||||
|
||||
## create a webhook data bridge with id "my_webhook"
|
||||
!curl --user admin:public --silent --show-error 'http://localhost:18083/api/v5/bridges' -X 'POST' -H 'Content-Type: application/json' --data-binary '{"name":"my_webhook","body":"","method":"post","url":"http://webhook.emqx.io:7077/counter","headers":{"content-type":"application/json"},"pool_size":4,"enable_pipelining":100,"connect_timeout":"5s","request_timeout":"5s","max_retries":3,"type":"webhook","ssl":{"enable":false,"verify":"verify_none"}}' | jq '.status'
|
||||
!curl --user admin:public --silent --show-error 'http://localhost:18083/api/v5/bridges' -X 'POST' -H 'Content-Type: application/json' --data-binary '{"name":"my_webhook","body":"","method":"post","url":"http://webhook.emqx.io:7077/counter","headers":{"content-type":"application/json"},"pool_size":4,"enable_pipelining":100,"connect_timeout":"5s","type":"webhook","ssl":{"enable":false,"verify":"verify_none"}}' | jq '.status'
|
||||
?connected
|
||||
?SH-PROMPT
|
||||
|
||||
|
|
Loading…
Reference in New Issue