Merge remote-tracking branch 'ce/main-v4.3' into merge-main-v4.3-into-v4.4

This commit is contained in:
JianBo He 2022-05-26 11:18:15 +08:00
commit c5349cef64
7 changed files with 240 additions and 65 deletions

View File

@ -58,7 +58,7 @@
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
, {getopt, "1.0.1"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}}
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.2.1"}}}
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}}
, {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}
]}.

View File

@ -168,8 +168,8 @@ find_appup_actions(App,
OldDowngrade = ensure_all_patch_versions(App, CurrVersion, OldDowngrade0),
UpDiff = diff_app(up, App, CurrAppIdx, PrevAppIdx),
DownDiff = diff_app(down, App, PrevAppIdx, CurrAppIdx),
Upgrade = merge_update_actions(App, UpDiff, OldUpgrade),
Downgrade = merge_update_actions(App, DownDiff, OldDowngrade),
Upgrade = merge_update_actions(App, UpDiff, OldUpgrade, PrevVersion),
Downgrade = merge_update_actions(App, DownDiff, OldDowngrade, PrevVersion),
case OldUpgrade =:= Upgrade andalso OldDowngrade =:= Downgrade of
true -> [];
false -> [{App, {Upgrade, Downgrade, OldUpgrade, OldDowngrade}}]
@ -258,14 +258,40 @@ find_base_appup_actions(App, PrevVersion) ->
end,
{ensure_version(PrevVersion, Upgrade), ensure_version(PrevVersion, Downgrade)}.
merge_update_actions(App, Changes, Vsns) ->
merge_update_actions(App, Changes, Vsns, PrevVersion) ->
lists:map(fun(Ret = {<<".*">>, _}) ->
Ret;
({Vsn, Actions}) ->
{Vsn, do_merge_update_actions(App, Changes, Actions)}
case is_skipped_version(App, Vsn, PrevVersion) of
true ->
log("WARN: ~p has version ~s skipped over?~n", [App, Vsn]),
{Vsn, Actions};
false ->
{Vsn, do_merge_update_actions(App, Changes, Actions)}
end
end,
Vsns).
%% say current version is 1.1.3, and the compare base is version 1.1.1,
%% but there is a 1.1.2 in appup we may skip merging instructions for
%% 1.1.2 because it's not used and no way to know what has been changed
is_skipped_version(App, Vsn, PrevVersion) when is_list(Vsn) andalso is_list(PrevVersion) ->
case is_app_external(App) andalso parse_version_number(Vsn) of
{ok, VsnTuple} ->
case parse_version_number(PrevVersion) of
{ok, PrevVsnTuple} ->
VsnTuple > PrevVsnTuple;
_ ->
false
end;
_ ->
false
end;
is_skipped_version(_App, _Vsn, _PrevVersion) ->
%% if app version is a regexp, we don't know for sure
%% return 'false' to be on the safe side
false.
do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) ->
AppSpecific = app_specific_actions(App) -- OldActions,
AlreadyHandled = lists:flatten(lists:map(fun process_old_action/1, OldActions)),

View File

@ -8,6 +8,9 @@
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_auth_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
@ -17,6 +20,9 @@
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_auth_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
@ -31,6 +37,9 @@
{load_module,emqx_relup}]},
{"4.4.1",
[{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_auth_stats_from_ets_to_counter,[]}},
{load_module,emqx,brutal_purge,soft_purge,[]},
@ -50,13 +59,15 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{add_module,emqx_relup}]},
{"4.4.0",
[{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
@ -79,9 +90,7 @@
{apply,{emqx_metrics,assign_auth_stats_from_ets_to_counter,[]}},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
@ -94,6 +103,9 @@
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
@ -104,6 +116,9 @@
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
@ -115,6 +130,9 @@
{load_module,emqx_relup}]},
{"4.4.1",
[{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
@ -133,13 +151,15 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{delete_module,emqx_relup}]},
{"4.4.0",
[{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
@ -159,9 +179,7 @@
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},

View File

@ -39,6 +39,8 @@
, deactivate/1
, deactivate/2
, delete_all_deactivated_alarms/0
, ensure_deactivated/1
, ensure_deactivated/2
, get_alarms/0
, get_alarms/1
]).
@ -132,6 +134,24 @@ activate(Name) ->
activate(Name, Details) ->
gen_server:call(?MODULE, {activate_alarm, Name, Details}).
-spec ensure_deactivated(binary() | atom()) -> ok.
ensure_deactivated(Name) ->
ensure_deactivated(Name, no_details).
-spec ensure_deactivated(binary() | atom(), atom() | map()) -> ok.
ensure_deactivated(Name, Data) ->
%% this duplicates the dirty read in handle_call,
%% intention is to avoid making gen_server calls when there is no alarm
case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
[] ->
ok;
_ ->
case deactivate(Name, Data) of
{error, not_found} -> ok;
Other -> Other
end
end.
deactivate(Name) ->
gen_server:call(?MODULE, {deactivate_alarm, Name, no_details}).

View File

@ -56,21 +56,12 @@ init({_Args, {alarm_handler, _ExistingAlarms}}) ->
init(_) ->
{ok, []}.
handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
emqx_alarm:activate(high_system_memory_usage, #{high_watermark => emqx_os_mon:get_sysmem_high_watermark()}),
{ok, State};
handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
emqx_alarm:activate(high_process_memory_usage, #{pid => list_to_binary(pid_to_list(Pid)),
high_watermark => emqx_os_mon:get_procmem_high_watermark()}),
{ok, State};
handle_event({clear_alarm, system_memory_high_watermark}, State) ->
emqx_alarm:deactivate(high_system_memory_usage),
{ok, State};
handle_event({clear_alarm, process_memory_high_watermark}, State) ->
emqx_alarm:deactivate(high_process_memory_usage),
handle_event({clear_alarm, process_memory_high_watermark}, State) ->
emqx_alarm:ensure_deactivate(high_process_memory_usage),
{ok, State};
handle_event(_, State) ->

View File

@ -78,32 +78,29 @@ set_cpu_low_watermark(Float) ->
call({set_cpu_low_watermark, Float}).
get_mem_check_interval() ->
memsup:get_check_interval() div 1000.
call(?FUNCTION_NAME).
set_mem_check_interval(Seconds) when Seconds < 60 ->
memsup:set_check_interval(1);
set_mem_check_interval(Seconds) ->
memsup:set_check_interval(Seconds div 60).
call({?FUNCTION_NAME, Seconds}).
get_sysmem_high_watermark() ->
memsup:get_sysmem_high_watermark().
call(?FUNCTION_NAME).
set_sysmem_high_watermark(Float) ->
V = Float/100,
set_sysmem_high_watermark(HW) ->
case load_ctl:get_config() of
#{ ?MEM_MON_F0 := true } = OldLC ->
ok = load_ctl:put_config(OldLC#{ ?MEM_MON_F0 => true
, ?MEM_MON_F1 => V});
, ?MEM_MON_F1 => HW / 100});
_ ->
skip
end,
memsup:set_sysmem_high_watermark(V).
gen_server:call(?OS_MON, {?FUNCTION_NAME, HW}, infinity).
get_procmem_high_watermark() ->
memsup:get_procmem_high_watermark().
set_procmem_high_watermark(Float) ->
memsup:set_procmem_high_watermark(Float / 100).
set_procmem_high_watermark(HW) ->
memsup:set_procmem_high_watermark(HW / 100).
call(Req) ->
gen_server:call(?OS_MON, Req, infinity).
@ -113,16 +110,38 @@ call(Req) ->
%%--------------------------------------------------------------------
init([Opts]) ->
set_mem_check_interval(proplists:get_value(mem_check_interval, Opts)),
SysHW = proplists:get_value(sysmem_high_watermark, Opts),
set_sysmem_high_watermark(SysHW),
process_flag(trap_exit, true),
%% make sure memsup will not emit system memory alarms
memsup:set_sysmem_high_watermark(1),
set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts)),
ensure_system_memory_alarm(SysHW),
{ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts),
MemCheckInterval = do_resolve_mem_check_interval(proplists:get_value(mem_check_interval, Opts)),
SysHW = proplists:get_value(sysmem_high_watermark, Opts),
St = ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts),
cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts),
cpu_check_interval => proplists:get_value(cpu_check_interval, Opts),
timer => undefined})}.
sysmem_high_watermark => SysHW,
mem_check_interval => MemCheckInterval,
timer => undefined}),
ok = do_set_mem_check_interval(MemCheckInterval),
%% update immediately after start/restart
ok = update_mem_alarm_status(SysHW),
{ok, ensure_mem_check_timer(St)}.
handle_call(get_sysmem_high_watermark, _From, State) ->
#{sysmem_high_watermark := SysHW} = State,
{reply, maybe_round(SysHW), State};
handle_call(get_mem_check_interval, _From, State) ->
#{mem_check_interval := Interval} = State,
{reply, Interval, State};
handle_call({set_sysmem_high_watermark, SysHW}, _From, State) ->
%% update immediately after start/restart
ok = update_mem_alarm_status(SysHW),
{reply, ok, State#{sysmem_high_watermark => SysHW}};
handle_call({set_mem_check_interval, Seconds0}, _From, State) ->
Seconds = do_resolve_mem_check_interval(Seconds0),
ok = do_set_mem_check_interval(Seconds),
%% will start taking effect when the current timer expires
{reply, ok, State#{mem_check_interval => Seconds}};
handle_call(get_cpu_check_interval, _From, State) ->
{reply, maps:get(cpu_check_interval, State, undefined), State};
@ -168,16 +187,28 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer,
ensure_check_timer(State)
end,
{noreply, NState};
handle_info({timeout, Timer, check_mem}, #{mem_check_timer := Timer,
sysmem_high_watermark := SysHW
} = State) ->
ok = update_mem_alarm_status(SysHW),
NState = ensure_mem_check_timer(State#{mem_check_timer := undefined}),
{noreply, NState};
handle_info(Info, State) ->
?LOG(error, "unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{timer := Timer}) ->
terminate(_Reason, #{timer := Timer} = St) ->
emqx_misc:cancel_timer(maps:get(mem_check_timer, St, undefined)),
emqx_misc:cancel_timer(Timer).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% NOTE: downgrade is not handled as the extra fields added to State
%% does not affect old version code.
%% The only thing which may slip through is that a started timer
%% will result in a "unexpected info" error log for the old version code
NewState = ensure_mem_check_timer(State),
SysHW = resolve_sysmem_high_watermark(State),
{ok, NewState#{sysmem_high_watermark => SysHW}}.
%%--------------------------------------------------------------------
%% Internal functions
@ -189,19 +220,99 @@ ensure_check_timer(State = #{cpu_check_interval := Interval}) ->
_ -> State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}
end.
%% At startup, memsup starts first and checks for memory alarms,
%% but emqx_alarm_handler is not yet used instead of alarm_handler,
%% so alarm_handler is used directly for notification (normally emqx_alarm_handler should be used).
%%The internal memsup will no longer trigger events that have been alerted,
%% and there is no exported function to remove the alerted flag,
%% so it can only be checked again at startup.
ensure_system_memory_alarm(HW) ->
case erlang:whereis(memsup) of
undefined -> ok;
_Pid ->
{Total, Allocated, _Worst} = memsup:get_memory_data(),
case Total =/= 0 andalso Allocated/Total * 100 > HW of
true -> emqx_alarm:activate(high_system_memory_usage, #{high_watermark => HW});
false -> ok
end
ensure_mem_check_timer(#{mem_check_timer := Ref} = State) when is_reference(Ref) ->
%% timer already started
State;
ensure_mem_check_timer(State) ->
Interval = resolve_mem_check_interval(State),
case is_sysmem_check_supported() of
true ->
State#{mem_check_timer => emqx_misc:start_timer(timer:seconds(Interval), check_mem),
mem_check_interval => Interval
};
false ->
State#{mem_check_timer => undefined,
mem_check_interval => Interval
}
end.
resolve_mem_check_interval(#{mem_check_interval := Seconds}) when is_integer(Seconds) ->
Seconds;
resolve_mem_check_interval(_) ->
%% this only happens when hot-upgrade from older version (< 4.3.14, or < 4.4.4)
try
%% memsup has interval set API using minutes, but returns in milliseconds from get API
IntervalMs = memsup:get_check_interval(),
true = (IntervalMs > 1000),
IntervalMs div 1000
catch
_ : _ ->
%% this is the memsup default
60
end.
is_sysmem_check_supported() ->
%% sorry Mac and Windows, for now
{unix, linux} =:= os:type().
%% we still need to set memsup interval for process (not system) memory check
do_set_mem_check_interval(Seconds) ->
Minutes = Seconds div 60,
_ = memsup:set_check_interval(Minutes),
ok.
%% keep the time unit alignment with memsup, minmum interval is 60 seconds.
do_resolve_mem_check_interval(Seconds) ->
case is_integer(Seconds) andalso Seconds >= 60 of
true -> Seconds;
false -> 60
end.
resolve_sysmem_high_watermark(#{sysmem_high_watermark := SysHW}) -> SysHW;
resolve_sysmem_high_watermark(_) ->
%% sysmem_high_watermark is not found in state map
%% get it from memsup
memsup:get_sysmem_high_watermark().
update_mem_alarm_status(SysHW) ->
case is_sysmem_check_supported() of
true ->
do_update_mem_alarm_status(SysHW);
false ->
%% in case the old alarm is activated
ok = emqx_alarm:ensure_deactivated(high_system_memory_usage, #{reason => disabled})
end.
do_update_mem_alarm_status(SysHW) ->
Usage = current_sysmem_percent(),
case Usage > SysHW of
true ->
_ = emqx_alarm:activate(
high_system_memory_usage,
#{
usage => Usage,
high_watermark => SysHW
}
);
_ ->
ok = emqx_alarm:ensure_deactivated(
high_system_memory_usage,
#{
usage => Usage,
high_watermark => SysHW
}
)
end,
ok.
current_sysmem_percent() ->
Ratio = load_ctl:get_memory_usage(),
erlang:floor(Ratio * 10000) / 100.
maybe_round(X) when is_integer(X) -> X;
maybe_round(X) when is_float(X) ->
R = erlang:round(X),
case erlang:abs(X - R) > 1.0e-6 of
true -> X;
false -> R
end.

View File

@ -43,14 +43,23 @@ end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]),
application:stop(os_mon).
% t_set_mem_check_interval(_) ->
% error('TODO').
t_set_mem_check_interval(_) ->
emqx_os_mon:set_mem_check_interval(0),
?assertEqual(60, emqx_os_mon:get_mem_check_interval()),
emqx_os_mon:set_mem_check_interval(61),
?assertEqual(61, emqx_os_mon:get_mem_check_interval()),
ok.
% t_set_sysmem_high_watermark(_) ->
% error('TODO').
% t_set_procmem_high_watermark(_) ->
% error('TODO').
t_set_sysmem_high_watermark(_) ->
emqx_os_mon:set_sysmem_high_watermark(10),
?assertEqual(10, emqx_os_mon:get_sysmem_high_watermark()),
emqx_os_mon:set_sysmem_high_watermark(100),
?assertEqual(100, emqx_os_mon:get_sysmem_high_watermark()),
emqx_os_mon:set_sysmem_high_watermark(90),
?assertEqual(90, emqx_os_mon:get_sysmem_high_watermark()),
emqx_os_mon:set_sysmem_high_watermark(93.2),
?assertEqual(93.2, emqx_os_mon:get_sysmem_high_watermark()),
ok.
t_api(_) ->
?assertEqual(1, emqx_os_mon:get_cpu_check_interval()),