From ea558b2bc5c4a51e7a4d59ddfe3a348266bafb19 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 16 Feb 2022 12:29:04 +0800 Subject: [PATCH 1/6] feat(license): license expriy early alarm. --- lib-ee/emqx_license/etc/emqx_license.conf | 2 + lib-ee/emqx_license/include/emqx_license.hrl | 2 +- .../emqx_license/src/emqx_license_checker.erl | 50 +++++++++++++------ .../src/emqx_license_resources.erl | 19 +++++++ .../emqx_license/src/emqx_license_schema.erl | 36 +++++++++++-- 5 files changed, 89 insertions(+), 20 deletions(-) diff --git a/lib-ee/emqx_license/etc/emqx_license.conf b/lib-ee/emqx_license/etc/emqx_license.conf index 331d98aa8..6c5ad217b 100644 --- a/lib-ee/emqx_license/etc/emqx_license.conf +++ b/lib-ee/emqx_license/etc/emqx_license.conf @@ -1,3 +1,5 @@ license { key = "MjIwMTExCjAKMTAKRXZhbHVhdGlvbgpjb250YWN0QGVtcXguaW8KMjAyMjAxMDEKMzY1MDAKMTAK.MEUCIFc9EUjqB3SjpRqWjqmAzI4Tg4LwhCRet9scEoxMRt8fAiEAk6vfYUiPOTzBC+3EjNF3WmLTiA3B0TN5ZNwuTKbTXJQ=" + connection_low_watermark = 75%, + connection_high_watermark = 80% } diff --git a/lib-ee/emqx_license/include/emqx_license.hrl b/lib-ee/emqx_license/include/emqx_license.hrl index a906b7b4e..77bfd031f 100644 --- a/lib-ee/emqx_license/include/emqx_license.hrl +++ b/lib-ee/emqx_license/include/emqx_license.hrl @@ -21,7 +21,7 @@ "======================================================\n" "Your license has expired.\n" "Please visit https://emqx.com/apply-licenses/emqx or\n" - "contact our customer services for an updated license.\n" + "contact customer services.\n" "======================================================\n" ). diff --git a/lib-ee/emqx_license/src/emqx_license_checker.erl b/lib-ee/emqx_license/src/emqx_license_checker.erl index 5571eaada..b8ddd2d42 100644 --- a/lib-ee/emqx_license/src/emqx_license_checker.erl +++ b/lib-ee/emqx_license/src/emqx_license_checker.erl @@ -10,6 +10,7 @@ -behaviour(gen_server). -define(CHECK_INTERVAL, 5000). +-define(EXPIRY_ALARM_CHECK_INTERVAL, 24 * 60* 60). -export([start_link/1, start_link/2, @@ -70,16 +71,18 @@ purge() -> init([LicenseFetcher, CheckInterval]) -> case LicenseFetcher() of {ok, License} -> - ?LICENSE_TAB = ets:new(?LICENSE_TAB, [set, protected, named_table]), + ?LICENSE_TAB = ets:new(?LICENSE_TAB, [set, protected, named_table, read_concurrency]), #{} = check_license(License), - State = ensure_timer(#{check_license_interval => CheckInterval, + State0 = ensure_check_license_timer(#{check_license_interval => CheckInterval, license => License}), + State = ensure_check_expiry_timer(State0), {ok, State}; {error, _} = Error -> Error end. handle_call({update, License}, _From, State) -> + _ = expiry_early_alarm(License), {reply, check_license(License), State#{license => License}}; handle_call(dump, _From, #{license := License} = State) -> {reply, emqx_license_parser:dump(License), State}; @@ -94,10 +97,15 @@ handle_cast(_Msg, State) -> handle_info(check_license, #{license := License} = State) -> #{} = check_license(License), - NewState = ensure_timer(State), + NewState = ensure_check_license_timer(State), ?tp(debug, emqx_license_checked, #{}), {noreply, NewState}; +handle_info(check_expiry_alarm, #{license := License} = State) -> + _ = expiry_early_alarm(License), + NewState = ensure_check_expiry_timer(State), + {noreply, NewState}; + handle_info(_Msg, State) -> {noreply, State}. @@ -105,15 +113,21 @@ handle_info(_Msg, State) -> %% Private functions %%------------------------------------------------------------------------------ -ensure_timer(#{check_license_interval := CheckInterval} = State) -> - _ = case State of - #{timer := Timer} -> erlang:cancel_timer(Timer); - _ -> ok - end, +ensure_check_license_timer(#{check_license_interval := CheckInterval} = State) -> + cancel_timer(State, timer), State#{timer => erlang:send_after(CheckInterval, self(), check_license)}. +ensure_check_expiry_timer(State) -> + cancel_timer(State, expiry_alarm_timer), + Ref = erlang:send_after(?EXPIRY_ALARM_CHECK_INTERVAL, self(), check_expiry_alarm), + State#{expiry_alarm_timer => Ref}. + +cancel_timer(#{Key := Ref}, Key) when is_reference(Ref) -> erlang:cancel_timer(Ref); +cancel_timer(_, _) -> ok. + check_license(License) -> - NeedRestrict = need_restrict(License), + DaysLeft = days_left(License), + NeedRestrict = need_restrict(License, DaysLeft), Limits = limits(License, NeedRestrict), true = apply_limits(Limits), #{warn_evaluation => warn_evaluation(License, NeedRestrict), @@ -133,17 +147,25 @@ days_left(License) -> {DateNow, _} = calendar:universal_time(), calendar:date_to_gregorian_days(DateEnd) - calendar:date_to_gregorian_days(DateNow). -need_restrict(License)-> - DaysLeft = days_left(License), +need_restrict(License, DaysLeft)-> CType = emqx_license_parser:customer_type(License), Type = emqx_license_parser:license_type(License), DaysLeft < 0 - andalso (Type =/= ?OFFICIAL) or small_customer_overexpired(CType, DaysLeft). + andalso (Type =/= ?OFFICIAL) orelse small_customer_over_expired(CType, DaysLeft). -small_customer_overexpired(?SMALL_CUSTOMER, DaysLeft) +small_customer_over_expired(?SMALL_CUSTOMER, DaysLeft) when DaysLeft < ?EXPIRED_DAY -> true; -small_customer_overexpired(_CType, _DaysLeft) -> false. +small_customer_over_expired(_CType, _DaysLeft) -> false. apply_limits(Limits) -> ets:insert(?LICENSE_TAB, {limits, Limits}). + +expiry_early_alarm(License) -> + case days_left(License) < 30 of + true -> + DateEnd = emqx_license_parser:expiry_date(License), + catch emqx_alarm:activate(license_expiry, #{expiry_at => DateEnd}); + false -> + catch emqx_alarm:deactivate(license_expiry) + end. diff --git a/lib-ee/emqx_license/src/emqx_license_resources.erl b/lib-ee/emqx_license/src/emqx_license_resources.erl index 33d661b1b..36988c9fa 100644 --- a/lib-ee/emqx_license/src/emqx_license_resources.erl +++ b/lib-ee/emqx_license/src/emqx_license_resources.erl @@ -60,6 +60,7 @@ handle_cast(_Msg, State) -> handle_info(update_resources, State) -> true = update_resources(), + connection_quota_early_alarm(), ?tp(debug, emqx_license_resources_updated, #{}), {noreply, ensure_timer(State)}. @@ -72,6 +73,24 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ %% Private functions %%------------------------------------------------------------------------------ +connection_quota_early_alarm() -> + connection_quota_early_alarm(emqx_license_checker:limits()). + +connection_quota_early_alarm(#{max_connections := Max}) when is_integer(Max) -> + Count = connection_count(), + Low = emqx_conf:get([license, connection_low_watermark], 0.75), + High = emqx_conf:get([license, connection_high_watermark], 0.80), + if + Count > Max * High -> + HighPercent = float_to_binary(High * 100, [{decimals, 0}]), + Message = iolist_to_binary(["License: live connection number exceeds ", HighPercent, "%"]), + catch emqx_alarm:activate(license_quota, #{high_watermark => HighPercent}, Message); + Count < Max * Low -> + catch emqx_alarm:deactivate(license_quota); + true -> + ok + end; +connection_quota_early_alarm(_Limits) -> ok. cached_remote_connection_count() -> try ets:lookup(?MODULE, remote_connection_count) of diff --git a/lib-ee/emqx_license/src/emqx_license_schema.erl b/lib-ee/emqx_license/src/emqx_license_schema.erl index bd4a471f4..e35235777 100644 --- a/lib-ee/emqx_license/src/emqx_license_schema.erl +++ b/lib-ee/emqx_license/src/emqx_license_schema.erl @@ -12,20 +12,46 @@ -behaviour(hocon_schema). --export([roots/0, fields/1]). +-export([roots/0, fields/1, validations/0]). -roots() -> [{license, hoconsc:union( - [hoconsc:ref(?MODULE, key_license), - hoconsc:ref(?MODULE, file_license)])}]. +roots() -> [{license, + hoconsc:mk(hoconsc:union([hoconsc:ref(?MODULE, key_license), + hoconsc:ref(?MODULE, file_license)]), + #{desc => "TODO"})} + ]. fields(key_license) -> [ {key, #{type => string(), sensitive => true, %% so it's not logged desc => "Configure the license as a string" }} - ]; + | common_fields()]; fields(file_license) -> [ {file, #{type => string(), desc => "Path to the license file" }} + | common_fields()]. + +common_fields() -> + [ + {connection_low_watermark, #{type => emqx_schema:percent(), + default => "75%", desc => "" + }}, + {connection_high_watermark, #{type => emqx_schema:percent(), + default => "80%", desc => "" + }} ]. + +validations() -> + [ {check_license_watermark, fun check_license_watermark/1}]. + +check_license_watermark(Conf) -> + case hocon_maps:get("license.connection_low_watermark", Conf) of + undefined -> true; + Low -> + High = hocon_maps:get("license.connection_high_watermark", Conf), + case High =/= undefined andalso High > Low of + true -> true; + false -> {bad_license_watermark, #{high => High, low => Low}} + end + end. From 34fe5082c459aef23a1bd51e31cb4dbe5c32ff82 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 16 Feb 2022 12:29:47 +0800 Subject: [PATCH 2/6] fix(schema): schema global validations not working --- apps/emqx/src/emqx_schema.erl | 24 ++++++++++ apps/emqx_authz/src/emqx_authz_schema.erl | 44 ++++++++++++------- apps/emqx_conf/src/emqx_conf_schema.erl | 6 ++- .../emqx_license/src/emqx_license_checker.erl | 8 +++- 4 files changed, 62 insertions(+), 20 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index eba7d2617..4f1d52865 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -57,6 +57,7 @@ -export([ validate_heap_size/1 , parse_user_lookup_fun/1 , validate_alarm_actions/1 + , validations/0 ]). % workaround: prevent being recognized as unused functions @@ -1602,6 +1603,29 @@ validate_tls_versions(Versions) -> Vs -> {error, {unsupported_ssl_versions, Vs}} end. +validations() -> + [{check_process_watermark, fun check_process_watermark/1} + ,{check_cpu_watermark, fun check_cpu_watermark/1} + ]. + +%% validations from emqx_conf_schema, we must filter other *_schema by undefined. +check_process_watermark(Conf) -> + check_watermark("sysmon.vm.process_low_watermark", "sysmon.vm.process_high_watermark", Conf). + +check_cpu_watermark(Conf) -> + check_watermark("sysmon.os.cpu_low_watermark", "sysmon.os.cpu_high_watermark", Conf). + +check_watermark(LowKey, HighKey, Conf) -> + case hocon_maps:get(LowKey, Conf) of + undefined -> true; + Low -> + High = hocon_maps:get(HighKey, Conf), + case Low < High of + true -> true; + false -> {bad_watermark, #{LowKey => Low, HighKey => High}} + end + end. + str(A) when is_atom(A) -> atom_to_list(A); str(B) when is_binary(B) -> diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index 779cc52ae..b5bee052f 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -205,27 +205,37 @@ transform_header_name(Headers) -> maps:put(K, V, Acc) end, #{}, Headers). -check_ssl_opts(Conf) - when Conf =:= #{} -> - true; check_ssl_opts(Conf) -> - case emqx_authz_http:parse_url(hocon_maps:get("config.url", Conf)) of - #{scheme := https} -> - case hocon_maps:get("config.ssl.enable", Conf) of - true -> ok; - false -> false - end; - #{scheme := http} -> - ok + case hocon_maps:get("config.url", Conf) of + undefined -> true; + Url -> + case emqx_authz_http:parse_url(Url) of + #{scheme := https} -> + case hocon_maps:get("config.ssl.enable", Conf) of + true -> true; + _ -> {error, ssl_not_enable} + end; + #{scheme := http} -> true; + Bad -> {bad_scheme, Url, Bad} + end end. -check_headers(Conf) - when Conf =:= #{} -> - true; check_headers(Conf) -> - Method = to_bin(hocon_maps:get("config.method", Conf)), - Headers = hocon_maps:get("config.headers", Conf), - Method =:= <<"post">> orelse (not lists:member(<<"content-type">>, Headers)). + case hocon_maps:get("config.method", Conf) of + undefined -> true; + Method0 -> + Method = to_bin(Method0), + Headers = hocon_maps:get("config.headers", Conf), + case Method of + <<"post">> -> true; + _ when Headers =:= undefined -> true; + _ when is_list(Headers) -> + case lists:member(<<"content-type">>, Headers) of + false -> true; + true -> {Method0, do_not_include_content_type} + end + end + end. union_array(Item) when is_list(Item) -> hoconsc:array(hoconsc:union(Item)). diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index d15e10136..aaf1966bd 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -36,7 +36,7 @@ file/0, cipher/0]). --export([namespace/0, roots/0, fields/1, translations/0, translation/1]). +-export([namespace/0, roots/0, fields/1, translations/0, translation/1, validations/0]). -export([conf_get/2, conf_get/3, keys/2, filter/1]). %% Static apps which merge their configs into the merged emqx.conf @@ -103,6 +103,10 @@ roots() -> emqx_schema:roots(low) ++ lists:flatmap(fun roots/1, ?MERGED_CONFIGS). +validations() -> + hocon_schema:validations(emqx_schema) ++ + lists:flatmap(fun hocon_schema:validations/1, ?MERGED_CONFIGS). + fields("cluster") -> [ {"name", sc(atom(), diff --git a/lib-ee/emqx_license/src/emqx_license_checker.erl b/lib-ee/emqx_license/src/emqx_license_checker.erl index b8ddd2d42..e0d511181 100644 --- a/lib-ee/emqx_license/src/emqx_license_checker.erl +++ b/lib-ee/emqx_license/src/emqx_license_checker.erl @@ -122,8 +122,12 @@ ensure_check_expiry_timer(State) -> Ref = erlang:send_after(?EXPIRY_ALARM_CHECK_INTERVAL, self(), check_expiry_alarm), State#{expiry_alarm_timer => Ref}. -cancel_timer(#{Key := Ref}, Key) when is_reference(Ref) -> erlang:cancel_timer(Ref); -cancel_timer(_, _) -> ok. +cancel_timer(State, Key) -> + Ref = maps:get(Key, State), + case is_reference(Ref) of + true -> erlang:cancel_timer(Ref); + false -> ok + end. check_license(License) -> DaysLeft = days_left(License), From 98b2cd683bff23999cc83a85f2bed741056fb91c Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 16 Feb 2022 16:50:28 +0800 Subject: [PATCH 3/6] fix: dialyzer warning --- lib-ee/emqx_license/src/emqx_license_checker.erl | 15 ++++++++------- .../emqx_license/src/emqx_license_resources.erl | 2 +- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/lib-ee/emqx_license/src/emqx_license_checker.erl b/lib-ee/emqx_license/src/emqx_license_checker.erl index e0d511181..bf02266d0 100644 --- a/lib-ee/emqx_license/src/emqx_license_checker.erl +++ b/lib-ee/emqx_license/src/emqx_license_checker.erl @@ -71,14 +71,14 @@ purge() -> init([LicenseFetcher, CheckInterval]) -> case LicenseFetcher() of {ok, License} -> - ?LICENSE_TAB = ets:new(?LICENSE_TAB, [set, protected, named_table, read_concurrency]), + ?LICENSE_TAB = ets:new(?LICENSE_TAB, [set, protected, named_table, {read_concurrency, true}]), #{} = check_license(License), State0 = ensure_check_license_timer(#{check_license_interval => CheckInterval, license => License}), State = ensure_check_expiry_timer(State0), {ok, State}; - {error, _} = Error -> - Error + {error, Reason} -> + {stop, Reason} end. handle_call({update, License}, _From, State) -> @@ -124,10 +124,11 @@ ensure_check_expiry_timer(State) -> cancel_timer(State, Key) -> Ref = maps:get(Key, State), - case is_reference(Ref) of - true -> erlang:cancel_timer(Ref); - false -> ok - end. + _ = case is_reference(Ref) of + true -> erlang:cancel_timer(Ref); + false -> ok + end, + ok. check_license(License) -> DaysLeft = days_left(License), diff --git a/lib-ee/emqx_license/src/emqx_license_resources.erl b/lib-ee/emqx_license/src/emqx_license_resources.erl index 36988c9fa..79bbc113b 100644 --- a/lib-ee/emqx_license/src/emqx_license_resources.erl +++ b/lib-ee/emqx_license/src/emqx_license_resources.erl @@ -76,7 +76,7 @@ code_change(_OldVsn, State, _Extra) -> connection_quota_early_alarm() -> connection_quota_early_alarm(emqx_license_checker:limits()). -connection_quota_early_alarm(#{max_connections := Max}) when is_integer(Max) -> +connection_quota_early_alarm({ok, #{max_connections := Max}}) when is_integer(Max) -> Count = connection_count(), Low = emqx_conf:get([license, connection_low_watermark], 0.75), High = emqx_conf:get([license, connection_high_watermark], 0.80), From d900418ce2af5e92e6cb676fd71072a672e795c9 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 16 Feb 2022 18:15:47 +0800 Subject: [PATCH 4/6] fix(license): fix maps:get/2 crash --- lib-ee/emqx_license/src/emqx_license_checker.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib-ee/emqx_license/src/emqx_license_checker.erl b/lib-ee/emqx_license/src/emqx_license_checker.erl index bf02266d0..8007ef3ce 100644 --- a/lib-ee/emqx_license/src/emqx_license_checker.erl +++ b/lib-ee/emqx_license/src/emqx_license_checker.erl @@ -123,10 +123,9 @@ ensure_check_expiry_timer(State) -> State#{expiry_alarm_timer => Ref}. cancel_timer(State, Key) -> - Ref = maps:get(Key, State), - _ = case is_reference(Ref) of - true -> erlang:cancel_timer(Ref); - false -> ok + _ = case maps:find(Key, State) of + {ok, Ref} when is_reference(Ref) -> erlang:cancel_timer(Ref); + _ -> ok end, ok. From 7853eb75df7199feae3222223e7514eb52372f29 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Thu, 17 Feb 2022 15:05:22 +0800 Subject: [PATCH 5/6] fix(license): fix crash update license.watermark. --- lib-ee/emqx_license/src/emqx_license.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib-ee/emqx_license/src/emqx_license.erl b/lib-ee/emqx_license/src/emqx_license.erl index 412a1a1d3..d7a9fc252 100644 --- a/lib-ee/emqx_license/src/emqx_license.erl +++ b/lib-ee/emqx_license/src/emqx_license.erl @@ -94,6 +94,7 @@ check(_ConnInfo, AckProps) -> pre_config_update(_, Cmd, Conf) -> {ok, do_update(Cmd, Conf)}. +post_config_update(_Path, _Cmd, ignore, _Old, _AppEnvs) -> ok; post_config_update(_Path, _Cmd, NewConf, _Old, _AppEnvs) -> case read_license(NewConf) of {ok, License} -> @@ -131,7 +132,10 @@ do_update({key, Content}, _Conf) when is_binary(Content); is_list(Content) -> #{<<"key">> => Content}; {error, Reason} -> erlang:throw(Reason) - end. + end; +%% We don't do extra action when update license's watermark. +do_update(_Other, _Conf) -> + {ok, ignore}. check_max_clients_exceeded(MaxClients) -> emqx_license_resources:connection_count() > MaxClients * 1.1. From 001b209811a6a17a74e003c0ce45b3ac1dbeb448 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Thu, 17 Feb 2022 15:49:41 +0800 Subject: [PATCH 6/6] fix(license): don't do not lose watermark fields. --- lib-ee/emqx_license/src/emqx_license.erl | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/lib-ee/emqx_license/src/emqx_license.erl b/lib-ee/emqx_license/src/emqx_license.erl index d7a9fc252..b4698aa0f 100644 --- a/lib-ee/emqx_license/src/emqx_license.erl +++ b/lib-ee/emqx_license/src/emqx_license.erl @@ -94,7 +94,6 @@ check(_ConnInfo, AckProps) -> pre_config_update(_, Cmd, Conf) -> {ok, do_update(Cmd, Conf)}. -post_config_update(_Path, _Cmd, ignore, _Old, _AppEnvs) -> ok; post_config_update(_Path, _Cmd, NewConf, _Old, _AppEnvs) -> case read_license(NewConf) of {ok, License} -> @@ -113,12 +112,12 @@ del_license_hook() -> _ = emqx_hooks:del('client.connect', {?MODULE, check, []}), ok. -do_update({file, Filename}, _Conf) -> +do_update({file, Filename}, Conf) -> case file:read_file(Filename) of {ok, Content} -> case emqx_license_parser:parse(Content) of {ok, _License} -> - #{<<"file">> => Filename}; + maps:remove(<<"key">>, Conf#{<<"file">> => Filename}); {error, Reason} -> erlang:throw(Reason) end; @@ -126,16 +125,16 @@ do_update({file, Filename}, _Conf) -> erlang:throw({invalid_license_file, Reason}) end; -do_update({key, Content}, _Conf) when is_binary(Content); is_list(Content) -> +do_update({key, Content}, Conf) when is_binary(Content); is_list(Content) -> case emqx_license_parser:parse(Content) of {ok, _License} -> - #{<<"key">> => Content}; + maps:remove(<<"file">>, Conf#{<<"key">> => Content}); {error, Reason} -> erlang:throw(Reason) end; %% We don't do extra action when update license's watermark. -do_update(_Other, _Conf) -> - {ok, ignore}. +do_update(_Other, Conf) -> + Conf. check_max_clients_exceeded(MaxClients) -> emqx_license_resources:connection_count() > MaxClients * 1.1.