diff --git a/apps/emqx_st_statistics/etc/emqx_st_statistics.conf b/apps/emqx_st_statistics/etc/emqx_st_statistics.conf new file mode 100644 index 000000000..88ca0c37f --- /dev/null +++ b/apps/emqx_st_statistics/etc/emqx_st_statistics.conf @@ -0,0 +1,38 @@ +##-------------------------------------------------------------------- +## EMQ X Slow Topics Statistics +##-------------------------------------------------------------------- + +## Threshold time of slow topics statistics +## +## Default: 10 seconds +st_statistics.threshold_time = 10S + +## Time window of slow topics statistics +## +## Value: 5 minutes +st_statistics.time_window = 5M + +## Maximum of slow topics log, log will clear when enter new time window +## +## Value: 500 +st_statistics.max_log_num = 500 + +## Top-K record for slow topics, update from logs +## +## Value: 500 +st_statistics.top_k_num = 500 + +## Topic of notification +## +## Defaut: $slow_topics +st_statistics.notice_topic = $slow_topics + +## QoS of notification message in notice topic +## +## Defaut: 0 +st_statistics.notice_qos = 0 + +## Maximum information number in one notification +## +## Default: 500 +st_statistics.notice_batch_size = 500 diff --git a/apps/emqx_st_statistics/priv/emqx_st_statistics.schema b/apps/emqx_st_statistics/priv/emqx_st_statistics.schema new file mode 100644 index 000000000..da9a2f810 --- /dev/null +++ b/apps/emqx_st_statistics/priv/emqx_st_statistics.schema @@ -0,0 +1,58 @@ +%%-*- mode: erlang -*- +%% st_statistics config mapping + +%% Threshold time of slow topics statistics +%% {$configurable} +{mapping, "st_statistics.threshold_time", "emqx_st_statistics.threshold_time", + [ + {default, "10S"}, + {datatype, [integer, {duration, ms}]} + ]}. + +%% Time window of slow topics statistics +%% {$configurable} +{mapping, "st_statistics.time_window", "emqx_st_statistics.time_window", + [ + {default, "5M"}, + {datatype, [integer, {duration, ms}]} + ]}. + +%% Maximum of slow topics log +%% {$configurable} +{mapping, "st_statistics.max_log_num", "emqx_st_statistics.max_log_num", + [ + {default, 500}, + {datatype, integer} + ]}. + +%% Top-K record for slow topics, update from logs +%% {$configurable} +{mapping, "st_statistics.top_k_num", "emqx_st_statistics.top_k_num", + [ + {default, 500}, + {datatype, integer} + ]}. + +%% Topic of notification +%% {$configurable} +{mapping, "st_statistics.notice_topic", "emqx_st_statistics.notice_topic", + [ + {default, <<"slow_topics">>}, + {datatype, string} + ]}. + +%% QoS of notification message in notice topic +%% {$configurable} +{mapping, "st_statistics.notice_qos", "emqx_st_statistics.notice_qos", + [ + {default, 0}, + {datatype, integer} + ]}. + +%% Maximum entities per notification message +%% {$configurable} +{mapping, "st_statistics.notice_batch_size", "emqx_st_statistics.notice_batch_size", + [ + {default, 500}, + {datatype, integer} + ]}. diff --git a/apps/emqx_st_statistics/rebar.config b/apps/emqx_st_statistics/rebar.config new file mode 100644 index 000000000..6433d92d6 --- /dev/null +++ b/apps/emqx_st_statistics/rebar.config @@ -0,0 +1,23 @@ +{deps, []}. + +{edoc_opts, [{preprocess, true}]}. +{erl_opts, [warn_unused_vars, + warn_shadow_vars, + warn_unused_import, + warn_obsolete_guard + ]}. + +{xref_checks, [undefined_function_calls, undefined_functions, + locals_not_used, deprecated_function_calls, + warnings_as_errors, deprecated_functions]}. +{cover_enabled, true}. +{cover_opts, [verbose]}. +{cover_export_enabled, true}. + +{profiles, + [{test, + [{deps, + [{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}}, + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}]} + ]} + ]}. diff --git a/apps/emqx_st_statistics/src/emqx_st_statistics.app.src b/apps/emqx_st_statistics/src/emqx_st_statistics.app.src new file mode 100644 index 000000000..b1eb1612a --- /dev/null +++ b/apps/emqx_st_statistics/src/emqx_st_statistics.app.src @@ -0,0 +1,12 @@ +{application, emqx_st_statistics, + [{description, "EMQ X Slow Topics Statistics"}, + {vsn, "1.0.0"}, % strict semver, bump manually! + {modules, []}, + {registered, [emqx_st_statistics_sup]}, + {applications, [kernel,stdlib]}, + {mod, {emqx_st_statistics_app,[]}}, + {env, []}, + {licenses, ["Apache-2.0"]}, + {maintainers, ["EMQ X Team "]}, + {links, []} + ]}. diff --git a/apps/emqx_st_statistics/src/emqx_st_statistics.appup.src b/apps/emqx_st_statistics/src/emqx_st_statistics.appup.src new file mode 100644 index 000000000..dcf0d8cdd --- /dev/null +++ b/apps/emqx_st_statistics/src/emqx_st_statistics.appup.src @@ -0,0 +1,9 @@ +%% -*-: erlang -*- +{VSN, + [ + {<<".*">>, []} + ], + [ + {<<".*">>, []} + ] +}. diff --git a/apps/emqx_st_statistics/src/emqx_st_statistics.erl b/apps/emqx_st_statistics/src/emqx_st_statistics.erl new file mode 100644 index 000000000..8a6c1535f --- /dev/null +++ b/apps/emqx_st_statistics/src/emqx_st_statistics.erl @@ -0,0 +1,331 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_st_statistics). + +-behaviour(gen_server). + +-include_lib("include/emqx.hrl"). +-include_lib("include/logger.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +-logger_header("[SLOW TOPICS]"). + +-export([ start_link/1, on_publish_done/3, enable/0 + , disable/0 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-compile(nowarn_unused_type). + +-type state() :: #{ config := proplist:proplist() + , index := index_map() + , begin_time := pos_integer() + , counter := counters:counter_ref() + , enable := boolean() + }. + +-type log() :: #{ topic := emqx_types:topic() + , times := pos_integer() + , average := float() + }. + +-type window_log() :: #{ begin_time := pos_integer() + , logs := [log()] + }. + +-record(slow_log, { topic :: emqx_types:topic() + , times :: non_neg_integer() + , elapsed :: non_neg_integer() + }). + +-record(top_k, { key :: any() + , average :: float()}). + +-type message() :: #message{}. + +-import(proplists, [get_value/2]). + +-define(LOG_TAB, emqx_st_statistics_log). +-define(TOPK_TAB, emqx_st_statistics_topk). +-define(NOW, erlang:system_time(millisecond)). +-define(TOP_KEY(Times, Topic), {Times, Topic}). +-define(QUOTA_IDX, 1). + +-type top_key() :: ?TOP_KEY(pos_integer(), emqx_types:topic()). +-type index_map() :: #{emqx_types:topic() => pos_integer()}. + +%% erlang term order +%% number < atom < reference < fun < port < pid < tuple < list < bit string + +%% ets ordered_set is ascending by term order + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +%% @doc Start the st_statistics +-spec(start_link(Env :: list()) -> emqx_types:startlink_ret()). +start_link(Env) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). + +-spec on_publish_done(message(), pos_integer(), counters:counters_ref()) -> ok. +on_publish_done(#message{timestamp = Timestamp} = Msg, Threshold, Counter) -> + case ?NOW - Timestamp of + Elapsed when Elapsed > Threshold -> + case get_log_quota(Counter) of + true -> + update_log(Msg, Elapsed); + _ -> + ok + end; + _ -> + ok + end. + +enable() -> + gen_server:call(?MODULE, {enable, true}). + +disable() -> + gen_server:call(?MODULE, {enable, false}). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([Env]) -> + erlang:process_flag(trap_exit, true), + init_log_tab(Env), + init_topk_tab(Env), + notification_tick(Env), + Counter = counters:new(1, [write_concurrency]), + set_log_quota(Env, Counter), + Threshold = get_value(threshold_time, Env), + load(Threshold, Counter), + {ok, #{config => Env, + index => #{}, + begin_time => ?NOW, + counter => Counter, + enable => true}}. + +handle_call({enable, Enable}, _From, + #{config := Cfg, counter := Counter, enable := IsEnable} = State) -> + State2 = case Enable of + IsEnable -> + State; + true -> + Threshold = get_value(threshold_time, Cfg), + load(Threshold, Counter), + State#{enable := true}; + _ -> + unload(), + State#{enable := false} + end, + {reply, ok, State2}; + +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info(notification_tick, #{config := Cfg} = State) -> + notification_tick(Cfg), + Index2 = do_notification(State), + {noreply, State#{index := Index2, + begin_time := ?NOW}}; + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _) -> + unload(), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +notification_tick(Env) -> + TimeWindow = get_value(time_window, Env), + erlang:send_after(TimeWindow, self(), ?FUNCTION_NAME). + +init_log_tab(_) -> + ?LOG_TAB = ets:new(?LOG_TAB, [ set, public, named_table + , {keypos, #slow_log.topic}, {write_concurrency, true} + , {read_concurrency, true} + ]). + +init_topk_tab(_) -> + ?TOPK_TAB = ets:new(?TOPK_TAB, [ ordered_set, protected, named_table + , {keypos, #top_k.key}, {write_concurrency, true} + , {read_concurrency, false} + ]). + +-spec get_log_quota(counter:counter_ref()) -> boolean(). +get_log_quota(Counter) -> + case counters:get(Counter, ?QUOTA_IDX) of + Quota when Quota > 0 -> + counters:sub(Counter, ?QUOTA_IDX, 1), + true; + _ -> + false + end. + +-spec set_log_quota(proplists:proplist(), counter:counter_ref()) -> ok. +set_log_quota(Cfg, Counter) -> + MaxLogNum = get_value(max_log_num, Cfg), + counters:put(Counter, ?QUOTA_IDX, MaxLogNum). + +-spec update_log(message(), non_neg_integer()) -> ok. +update_log(#message{topic = Topic}, Elapsed) -> + _ = ets:update_counter(?LOG_TAB, + Topic, + [{#slow_log.times, 1}, {#slow_log.elapsed, Elapsed}], + #slow_log{topic = Topic, + times = 1, + elapsed = Elapsed}), + ok. + +-spec do_notification(state()) -> index_map(). +do_notification(#{begin_time := BeginTime, + config := Cfg, + index := IndexMap, + counter := Counter}) -> + Logs = ets:tab2list(?LOG_TAB), + ets:delete_all_objects(?LOG_TAB), + start_publish(Logs, BeginTime, Cfg), + set_log_quota(Cfg, Counter), + MaxRecord = get_value(top_k_num, Cfg), + Size = ets:info(?TOPK_TAB, size), + update_top_k(Logs, erlang:max(0, MaxRecord - Size), IndexMap). + +-spec update_top_k(list(#slow_log{}), non_neg_integer(), index_map()) -> index_map(). +update_top_k([#slow_log{topic = Topic, + times = NewTimes, + elapsed = Elapsed} = Log | T], + Left, + IndexMap) -> + case maps:get(Topic, IndexMap, 0) of + 0 -> + try_insert_new(Log, Left, T, IndexMap); + Times -> + [#top_k{key = Key, average = Average}] = ets:lookup(?TOPK_TAB, ?TOP_KEY(Times, Topic)), + Times2 = Times + NewTimes, + Total = Times * Average + Elapsed, + Average2 = Total / Times2, + ets:delete(?TOPK_TAB, Key), + ets:insert(?TOPK_TAB, #top_k{key = ?TOP_KEY(Times2, Topic), average = Average2}), + update_top_k(T, Left, IndexMap#{Topic := Times2}) + end; + +update_top_k([], _, IndexMap) -> + IndexMap. + +-spec try_insert_new(#slow_log{}, + non_neg_integer(), list(#slow_log{}), index_map()) -> index_map(). +try_insert_new(#slow_log{topic = Topic, + times = Times, + elapsed = Elapsed}, Left, Logs, IndexMap) when Left > 0 -> + Average = Elapsed / Times, + ets:insert_new(?TOPK_TAB, #top_k{key = ?TOP_KEY(Times, Topic), average = Average}), + update_top_k(Logs, Left - 1, IndexMap#{Topic => Times}); + +try_insert_new(#slow_log{topic = Topic, + times = Times, + elapsed = Elapsed}, Left, Logs, IndexMap) -> + ?TOP_KEY(MinTimes, MinTopic) = MinKey = ets:first(?TOPK_TAB), + case MinTimes > Times of + true -> + update_top_k(Logs, Left, IndexMap); + _ -> + Average = Elapsed / Times, + ets:delete(?TOPK_TAB, MinKey), + ets:insert_new(?TOPK_TAB, #top_k{key = ?TOP_KEY(Times, Topic), average = Average}), + update_top_k(Logs, + Left - 1, + maps:put(Topic, Times, maps:remove(MinTopic, IndexMap))) + end. + +start_publish(Logs, BeginTime, Cfg) -> + emqx_pool:async_submit({fun do_publish/3, [Logs, BeginTime, Cfg]}). + +do_publish([], _, _) -> + ok; + +do_publish(Logs, BeginTime, Cfg) -> + BatchSize = get_value(notice_batch_size, Cfg), + do_publish(Logs, BatchSize, BeginTime, Cfg, []). + +do_publish([Log | T], Size, BeginTime, Cfg, Cache) when Size > 0 -> + Cache2 = [convert_to_notice(Log) | Cache], + do_publish(T, Size - 1, BeginTime, Cfg, Cache2); + +do_publish(Logs, Size, BeginTime, Cfg, Cache) when Size =:= 0 -> + publish(BeginTime, Cfg, Cache), + do_publish(Logs, BeginTime, Cfg); + +do_publish([], _, BeginTime, Cfg, Cache) -> + publish(BeginTime, Cfg, Cache), + ok. + +convert_to_notice(#slow_log{topic = Topic, + times = Times, + elapsed = Elapsed}) -> + #{topic => Topic, + times => Times, + average => Elapsed / Times}. + +publish(BeginTime, Cfg, Notices) -> + WindowLog = #{begin_time => BeginTime, + logs => Notices}, + Payload = emqx_json:encode(WindowLog), + _ = emqx:publish(#message{ id = emqx_guid:gen() + , qos = get_value(notice_qos, Cfg) + , from = ?MODULE + , topic = get_topic(Cfg) + , payload = Payload + , timestamp = ?NOW + }), + ok. + +load(Threshold, Counter) -> + _ = emqx:hook('message.publish_done', fun ?MODULE:on_publish_done/3, [Threshold, Counter]), + ok. + +unload() -> + emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3). + +get_topic(Cfg) -> + case get_value(notice_topic, Cfg) of + Topic when is_binary(Topic) -> + Topic; + Topic -> + erlang:list_to_binary(Topic) + end. diff --git a/apps/emqx_st_statistics/src/emqx_st_statistics_app.erl b/apps/emqx_st_statistics/src/emqx_st_statistics_app.erl new file mode 100644 index 000000000..e5b62a00e --- /dev/null +++ b/apps/emqx_st_statistics/src/emqx_st_statistics_app.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_st_statistics_app). + +-behaviour(application). + +-emqx_plugin(?MODULE). + +-export([ start/2 + , stop/1 + ]). + +start(_Type, _Args) -> + Env = application:get_all_env(emqx_st_statistics), + {ok, Sup} = emqx_st_statistics_sup:start_link(Env), + {ok, Sup}. + +stop(_State) -> + ok. diff --git a/apps/emqx_st_statistics/src/emqx_st_statistics_sup.erl b/apps/emqx_st_statistics/src/emqx_st_statistics_sup.erl new file mode 100644 index 000000000..cbff854ef --- /dev/null +++ b/apps/emqx_st_statistics/src/emqx_st_statistics_sup.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_st_statistics_sup). + +-behaviour(supervisor). + +-export([start_link/1]). + +-export([init/1]). + +start_link(Env) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, [Env]). + +init([Env]) -> + {ok, {{one_for_one, 10, 3600}, + [#{id => st_statistics, + start => {emqx_st_statistics, start_link, [Env]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_st_statistics]}]}}. diff --git a/apps/emqx_st_statistics/test/emqx_st_statistics_SUITE.erl b/apps/emqx_st_statistics/test/emqx_st_statistics_SUITE.erl new file mode 100644 index 000000000..c08c8d986 --- /dev/null +++ b/apps/emqx_st_statistics/test/emqx_st_statistics_SUITE.erl @@ -0,0 +1,110 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_st_statistics_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("include/emqx.hrl"). + +%-define(LOGT(Format, Args), ct:pal(Format, Args)). + +-define(LOG_TAB, emqx_st_statistics_log). +-define(TOPK_TAB, emqx_st_statistics_topk). +-define(NOW, erlang:system_time(millisecond)). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([emqx_st_statistics], fun set_special_cfg/1), + Config. + +set_special_cfg(_) -> + application:set_env([{emqx_st_statistics, base_conf()}]), + ok. + +end_per_suite(Config) -> + emqx_ct_helpers:stop_apps([emqx_st_statistics]), + Config. + +%%-------------------------------------------------------------------- +%% Test Cases +%%-------------------------------------------------------------------- +t_log_and_pub(_) -> + %% Sub topic first + SubBase = "/test", + emqx:subscribe("$slow_topics"), + Clients = start_client(SubBase), + timer:sleep(1000), + Now = ?NOW, + %% publish + ?assert(ets:info(?LOG_TAB, size) =:= 0), + lists:foreach(fun(I) -> + Topic = list_to_binary(io_lib:format("~s~p", [SubBase, I])), + Msg = emqx_message:make(Topic, <<"Hello">>), + emqx:publish(Msg#message{timestamp = Now - 1000}) + end, + lists:seq(1, 10)), + + ?assert(ets:info(?LOG_TAB, size) =:= 5), + + timer:sleep(2400), + + ?assert(ets:info(?LOG_TAB, size) =:= 0), + ?assert(ets:info(?TOPK_TAB, size) =:= 3), + try_receive(3), + try_receive(2), + [Client ! stop || Client <- Clients], + ok. + +base_conf() -> + [{top_k_num, 3}, + {threshold_time, 10}, + {notice_qos, 0}, + {notice_batch_size, 3}, + {notice_topic,"$slow_topics"}, + {time_window, 2000}, + {max_log_num, 5}]. + +start_client(Base) -> + [spawn(fun() -> + Topic = list_to_binary(io_lib:format("~s~p", [Base, I])), + client(Topic) + end) + || I <- lists:seq(1, 10)]. + +client(Topic) -> + {ok, C} = emqtt:start_link([{host, "localhost"}, + {clientid, Topic}, + {username, <<"plain">>}, + {password, <<"plain">>}]), + {ok, _} = emqtt:connect(C), + {ok, _, _} = emqtt:subscribe(C, Topic), + receive + stop -> + ok + end. + +try_receive(L) -> + receive + {deliver, _, #message{payload = Payload}} -> + #{<<"logs">> := Logs} = emqx_json:decode(Payload, [return_maps]), + ?assertEqual(length(Logs), L) + after 500 -> + ?assert(false) + end. diff --git a/rebar.config.erl b/rebar.config.erl index 1000a2c92..38dcb681b 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -303,6 +303,7 @@ relx_plugin_apps(ReleaseType) -> , emqx_recon , emqx_rule_engine , emqx_sasl + , emqx_st_statistics ] ++ [emqx_telemetry || not is_enterprise()] ++ relx_plugin_apps_per_rel(ReleaseType) diff --git a/scripts/apps-version-check.sh b/scripts/apps-version-check.sh index 596f7404a..7c2ea5eb2 100755 --- a/scripts/apps-version-check.sh +++ b/scripts/apps-version-check.sh @@ -5,38 +5,64 @@ latest_release=$(git describe --abbrev=0 --tags) bad_app_count=0 -while read -r app; do - if [ "$app" != "emqx" ]; then - app_path="$app" - else - app_path="." - fi - src_file="$app_path/src/$(basename "$app").app.src" - old_app_version="$(git show "$latest_release":"$src_file" | grep vsn | grep -oE '"[0-9]+.[0-9]+.[0-9]+"' | tr -d '"')" - now_app_version=$(grep -E 'vsn' "$src_file" | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"') - if [ "$old_app_version" = "$now_app_version" ]; then - changed="$(git diff --name-only "$latest_release"...HEAD \ - -- "$app_path/src" \ - -- "$app_path/priv" \ - -- "$app_path/c_src" | { grep -v -E 'appup\.src' || true; } | wc -l)" - if [ "$changed" -gt 0 ]; then - echo "$src_file needs a vsn bump" - bad_app_count=$(( bad_app_count + 1)) - elif [[ ${app_path} = *emqx_dashboard* ]]; then - ## emqx_dashboard is ensured to be upgraded after all other plugins - ## at the end of its appup instructions, there is the final instruction - ## {apply, {emqx_plugins, load, []} - ## since we don't know which plugins are stopped during the upgrade - ## for safty, we just force a dashboard version bump for each and every release - ## even if there is nothing changed in the app - echo "$src_file needs a vsn bump to ensure plugins loaded after upgrade" - bad_app_count=$(( bad_app_count + 1)) +get_vsn() { + commit="$1" + app_src_file="$2" + if [ "$commit" = 'HEAD' ]; then + if [ -f "$app_src_file" ]; then + grep vsn "$app_src_file" | grep -oE '"[0-9]+.[0-9]+.[0-9]+"' | tr -d '"' || true fi + else + git show "$commit":"$app_src_file" 2>/dev/null | grep vsn | grep -oE '"[0-9]+.[0-9]+.[0-9]+"' | tr -d '"' || true fi -done < <(./scripts/find-apps.sh) +} -if [ $bad_app_count -gt 0 ]; then - exit 1 -else - echo "apps version check successfully" -fi +check_apps() { + while read -r app_path; do + app=$(basename "$app_path") + src_file="$app_path/src/$app.app.src" + old_app_version="$(get_vsn "$latest_release" "$src_file")" + ## TODO: delete it after new version is released with emqx app in apps dir + if [ "$app" = 'emqx' ] && [ "$old_app_version" = '' ]; then + old_app_version="$(get_vsn "$latest_release" 'src/emqx.app.src')" + fi + now_app_version="$(get_vsn 'HEAD' "$src_file")" + ## TODO: delete it after new version is released with emqx app in apps dir + if [ "$app" = 'emqx' ] && [ "$now_app_version" = '' ]; then + now_app_version="$(get_vsn 'HEAD' 'src/emqx.app.src')" + fi + if [ -z "$now_app_version" ]; then + echo "failed_to_get_new_app_vsn for $app" + exit 1 + fi + if [ -z "${old_app_version:-}" ]; then + echo "skiped checking new app ${app}" + elif [ "$old_app_version" = "$now_app_version" ]; then + lines="$(git diff --name-only "$latest_release"...HEAD \ + -- "$app_path/src" \ + -- "$app_path/priv" \ + -- "$app_path/c_src")" + if [ "$lines" != '' ]; then + echo "$src_file needs a vsn bump (old=$old_app_version)" + echo "changed: $lines" + bad_app_count=$(( bad_app_count + 1)) + fi + fi + done < <(./scripts/find-apps.sh) + + if [ $bad_app_count -gt 0 ]; then + exit 1 + else + echo "apps version check successfully" + fi +} + +_main() { + if echo "${latest_release}" |grep -oE '[0-9]+.[0-9]+.[0-9]+' > /dev/null 2>&1; then + check_apps + else + echo "skiped unstable tag: ${latest_release}" + fi +} + +_main diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 9463345d4..db09bf8c9 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -319,6 +319,7 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, puback(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> + emqx:run_hook('message.publish_done', [Msg]), Inflight1 = emqx_inflight:delete(PacketId, Inflight), return_with(Msg, dequeue(Session#session{inflight = Inflight1})); {value, {_Pubrel, _Ts}} -> @@ -343,6 +344,8 @@ return_with(Msg, {ok, Publishes, Session}) -> pubrec(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> + %% execute hook here, because message record will be replaced by pubrel + emqx:run_hook('message.publish_done', [Msg]), Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight), {ok, Msg, Session#session{inflight = Inflight1}}; {value, {pubrel, _Ts}} -> @@ -439,11 +442,12 @@ deliver([Msg | More], Acc, Session) -> end. deliver_msg(Msg = #message{qos = ?QOS_0}, Session) -> + emqx:run_hook('message.publish_done', [Msg]), {ok, [{undefined, maybe_ack(Msg)}], Session}; deliver_msg(Msg = #message{qos = QoS}, Session = - #session{next_pkt_id = PacketId, inflight = Inflight}) - when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> + #session{next_pkt_id = PacketId, inflight = Inflight}) + when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case emqx_inflight:is_full(Inflight) of true -> Session1 = case maybe_nack(Msg) of @@ -696,4 +700,3 @@ age(Now, Ts) -> Now - Ts. set_field(Name, Value, Session) -> Pos = emqx_misc:index_of(Name, record_info(fields, session)), setelement(Pos+1, Session, Value). -