feat: add slow topics statistics plugin

This commit is contained in:
lafirest 2021-10-27 11:43:05 +08:00
parent aa90177302
commit f8acb31f89
12 changed files with 714 additions and 35 deletions

View File

@ -0,0 +1,38 @@
##--------------------------------------------------------------------
## EMQ X Slow Topics Statistics
##--------------------------------------------------------------------
## Threshold time of slow topics statistics
##
## Default: 10 seconds
st_statistics.threshold_time = 10S
## Time window of slow topics statistics
##
## Value: 5 minutes
st_statistics.time_window = 5M
## Maximum of slow topics log, log will clear when enter new time window
##
## Value: 500
st_statistics.max_log_num = 500
## Top-K record for slow topics, update from logs
##
## Value: 500
st_statistics.top_k_num = 500
## Topic of notification
##
## Defaut: $slow_topics
st_statistics.notice_topic = $slow_topics
## QoS of notification message in notice topic
##
## Defaut: 0
st_statistics.notice_qos = 0
## Maximum information number in one notification
##
## Default: 500
st_statistics.notice_batch_size = 500

View File

@ -0,0 +1,58 @@
%%-*- mode: erlang -*-
%% st_statistics config mapping
%% Threshold time of slow topics statistics
%% {$configurable}
{mapping, "st_statistics.threshold_time", "emqx_st_statistics.threshold_time",
[
{default, "10S"},
{datatype, [integer, {duration, ms}]}
]}.
%% Time window of slow topics statistics
%% {$configurable}
{mapping, "st_statistics.time_window", "emqx_st_statistics.time_window",
[
{default, "5M"},
{datatype, [integer, {duration, ms}]}
]}.
%% Maximum of slow topics log
%% {$configurable}
{mapping, "st_statistics.max_log_num", "emqx_st_statistics.max_log_num",
[
{default, 500},
{datatype, integer}
]}.
%% Top-K record for slow topics, update from logs
%% {$configurable}
{mapping, "st_statistics.top_k_num", "emqx_st_statistics.top_k_num",
[
{default, 500},
{datatype, integer}
]}.
%% Topic of notification
%% {$configurable}
{mapping, "st_statistics.notice_topic", "emqx_st_statistics.notice_topic",
[
{default, <<"slow_topics">>},
{datatype, string}
]}.
%% QoS of notification message in notice topic
%% {$configurable}
{mapping, "st_statistics.notice_qos", "emqx_st_statistics.notice_qos",
[
{default, 0},
{datatype, integer}
]}.
%% Maximum entities per notification message
%% {$configurable}
{mapping, "st_statistics.notice_batch_size", "emqx_st_statistics.notice_batch_size",
[
{default, 500},
{datatype, integer}
]}.

View File

@ -0,0 +1,23 @@
{deps, []}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard
]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test,
[{deps,
[{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}]}
]}
]}.

View File

@ -0,0 +1,12 @@
{application, emqx_st_statistics,
[{description, "EMQ X Slow Topics Statistics"},
{vsn, "1.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_st_statistics_sup]},
{applications, [kernel,stdlib]},
{mod, {emqx_st_statistics_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
{links, []}
]}.

View File

@ -0,0 +1,9 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -0,0 +1,331 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_st_statistics).
-behaviour(gen_server).
-include_lib("include/emqx.hrl").
-include_lib("include/logger.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-logger_header("[SLOW TOPICS]").
-export([ start_link/1, on_publish_done/3, enable/0
, disable/0
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-compile(nowarn_unused_type).
-type state() :: #{ config := proplist:proplist()
, index := index_map()
, begin_time := pos_integer()
, counter := counters:counter_ref()
, enable := boolean()
}.
-type log() :: #{ topic := emqx_types:topic()
, times := pos_integer()
, average := float()
}.
-type window_log() :: #{ begin_time := pos_integer()
, logs := [log()]
}.
-record(slow_log, { topic :: emqx_types:topic()
, times :: non_neg_integer()
, elapsed :: non_neg_integer()
}).
-record(top_k, { key :: any()
, average :: float()}).
-type message() :: #message{}.
-import(proplists, [get_value/2]).
-define(LOG_TAB, emqx_st_statistics_log).
-define(TOPK_TAB, emqx_st_statistics_topk).
-define(NOW, erlang:system_time(millisecond)).
-define(TOP_KEY(Times, Topic), {Times, Topic}).
-define(QUOTA_IDX, 1).
-type top_key() :: ?TOP_KEY(pos_integer(), emqx_types:topic()).
-type index_map() :: #{emqx_types:topic() => pos_integer()}.
%% erlang term order
%% number < atom < reference < fun < port < pid < tuple < list < bit string
%% ets ordered_set is ascending by term order
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
%% @doc Start the st_statistics
-spec(start_link(Env :: list()) -> emqx_types:startlink_ret()).
start_link(Env) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
-spec on_publish_done(message(), pos_integer(), counters:counters_ref()) -> ok.
on_publish_done(#message{timestamp = Timestamp} = Msg, Threshold, Counter) ->
case ?NOW - Timestamp of
Elapsed when Elapsed > Threshold ->
case get_log_quota(Counter) of
true ->
update_log(Msg, Elapsed);
_ ->
ok
end;
_ ->
ok
end.
enable() ->
gen_server:call(?MODULE, {enable, true}).
disable() ->
gen_server:call(?MODULE, {enable, false}).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Env]) ->
erlang:process_flag(trap_exit, true),
init_log_tab(Env),
init_topk_tab(Env),
notification_tick(Env),
Counter = counters:new(1, [write_concurrency]),
set_log_quota(Env, Counter),
Threshold = get_value(threshold_time, Env),
load(Threshold, Counter),
{ok, #{config => Env,
index => #{},
begin_time => ?NOW,
counter => Counter,
enable => true}}.
handle_call({enable, Enable}, _From,
#{config := Cfg, counter := Counter, enable := IsEnable} = State) ->
State2 = case Enable of
IsEnable ->
State;
true ->
Threshold = get_value(threshold_time, Cfg),
load(Threshold, Counter),
State#{enable := true};
_ ->
unload(),
State#{enable := false}
end,
{reply, ok, State2};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(notification_tick, #{config := Cfg} = State) ->
notification_tick(Cfg),
Index2 = do_notification(State),
{noreply, State#{index := Index2,
begin_time := ?NOW}};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _) ->
unload(),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
notification_tick(Env) ->
TimeWindow = get_value(time_window, Env),
erlang:send_after(TimeWindow, self(), ?FUNCTION_NAME).
init_log_tab(_) ->
?LOG_TAB = ets:new(?LOG_TAB, [ set, public, named_table
, {keypos, #slow_log.topic}, {write_concurrency, true}
, {read_concurrency, true}
]).
init_topk_tab(_) ->
?TOPK_TAB = ets:new(?TOPK_TAB, [ ordered_set, protected, named_table
, {keypos, #top_k.key}, {write_concurrency, true}
, {read_concurrency, false}
]).
-spec get_log_quota(counter:counter_ref()) -> boolean().
get_log_quota(Counter) ->
case counters:get(Counter, ?QUOTA_IDX) of
Quota when Quota > 0 ->
counters:sub(Counter, ?QUOTA_IDX, 1),
true;
_ ->
false
end.
-spec set_log_quota(proplists:proplist(), counter:counter_ref()) -> ok.
set_log_quota(Cfg, Counter) ->
MaxLogNum = get_value(max_log_num, Cfg),
counters:put(Counter, ?QUOTA_IDX, MaxLogNum).
-spec update_log(message(), non_neg_integer()) -> ok.
update_log(#message{topic = Topic}, Elapsed) ->
_ = ets:update_counter(?LOG_TAB,
Topic,
[{#slow_log.times, 1}, {#slow_log.elapsed, Elapsed}],
#slow_log{topic = Topic,
times = 1,
elapsed = Elapsed}),
ok.
-spec do_notification(state()) -> index_map().
do_notification(#{begin_time := BeginTime,
config := Cfg,
index := IndexMap,
counter := Counter}) ->
Logs = ets:tab2list(?LOG_TAB),
ets:delete_all_objects(?LOG_TAB),
start_publish(Logs, BeginTime, Cfg),
set_log_quota(Cfg, Counter),
MaxRecord = get_value(top_k_num, Cfg),
Size = ets:info(?TOPK_TAB, size),
update_top_k(Logs, erlang:max(0, MaxRecord - Size), IndexMap).
-spec update_top_k(list(#slow_log{}), non_neg_integer(), index_map()) -> index_map().
update_top_k([#slow_log{topic = Topic,
times = NewTimes,
elapsed = Elapsed} = Log | T],
Left,
IndexMap) ->
case maps:get(Topic, IndexMap, 0) of
0 ->
try_insert_new(Log, Left, T, IndexMap);
Times ->
[#top_k{key = Key, average = Average}] = ets:lookup(?TOPK_TAB, ?TOP_KEY(Times, Topic)),
Times2 = Times + NewTimes,
Total = Times * Average + Elapsed,
Average2 = Total / Times2,
ets:delete(?TOPK_TAB, Key),
ets:insert(?TOPK_TAB, #top_k{key = ?TOP_KEY(Times2, Topic), average = Average2}),
update_top_k(T, Left, IndexMap#{Topic := Times2})
end;
update_top_k([], _, IndexMap) ->
IndexMap.
-spec try_insert_new(#slow_log{},
non_neg_integer(), list(#slow_log{}), index_map()) -> index_map().
try_insert_new(#slow_log{topic = Topic,
times = Times,
elapsed = Elapsed}, Left, Logs, IndexMap) when Left > 0 ->
Average = Elapsed / Times,
ets:insert_new(?TOPK_TAB, #top_k{key = ?TOP_KEY(Times, Topic), average = Average}),
update_top_k(Logs, Left - 1, IndexMap#{Topic => Times});
try_insert_new(#slow_log{topic = Topic,
times = Times,
elapsed = Elapsed}, Left, Logs, IndexMap) ->
?TOP_KEY(MinTimes, MinTopic) = MinKey = ets:first(?TOPK_TAB),
case MinTimes > Times of
true ->
update_top_k(Logs, Left, IndexMap);
_ ->
Average = Elapsed / Times,
ets:delete(?TOPK_TAB, MinKey),
ets:insert_new(?TOPK_TAB, #top_k{key = ?TOP_KEY(Times, Topic), average = Average}),
update_top_k(Logs,
Left - 1,
maps:put(Topic, Times, maps:remove(MinTopic, IndexMap)))
end.
start_publish(Logs, BeginTime, Cfg) ->
emqx_pool:async_submit({fun do_publish/3, [Logs, BeginTime, Cfg]}).
do_publish([], _, _) ->
ok;
do_publish(Logs, BeginTime, Cfg) ->
BatchSize = get_value(notice_batch_size, Cfg),
do_publish(Logs, BatchSize, BeginTime, Cfg, []).
do_publish([Log | T], Size, BeginTime, Cfg, Cache) when Size > 0 ->
Cache2 = [convert_to_notice(Log) | Cache],
do_publish(T, Size - 1, BeginTime, Cfg, Cache2);
do_publish(Logs, Size, BeginTime, Cfg, Cache) when Size =:= 0 ->
publish(BeginTime, Cfg, Cache),
do_publish(Logs, BeginTime, Cfg);
do_publish([], _, BeginTime, Cfg, Cache) ->
publish(BeginTime, Cfg, Cache),
ok.
convert_to_notice(#slow_log{topic = Topic,
times = Times,
elapsed = Elapsed}) ->
#{topic => Topic,
times => Times,
average => Elapsed / Times}.
publish(BeginTime, Cfg, Notices) ->
WindowLog = #{begin_time => BeginTime,
logs => Notices},
Payload = emqx_json:encode(WindowLog),
_ = emqx:publish(#message{ id = emqx_guid:gen()
, qos = get_value(notice_qos, Cfg)
, from = ?MODULE
, topic = get_topic(Cfg)
, payload = Payload
, timestamp = ?NOW
}),
ok.
load(Threshold, Counter) ->
_ = emqx:hook('message.publish_done', fun ?MODULE:on_publish_done/3, [Threshold, Counter]),
ok.
unload() ->
emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3).
get_topic(Cfg) ->
case get_value(notice_topic, Cfg) of
Topic when is_binary(Topic) ->
Topic;
Topic ->
erlang:list_to_binary(Topic)
end.

View File

@ -0,0 +1,33 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_st_statistics_app).
-behaviour(application).
-emqx_plugin(?MODULE).
-export([ start/2
, stop/1
]).
start(_Type, _Args) ->
Env = application:get_all_env(emqx_st_statistics),
{ok, Sup} = emqx_st_statistics_sup:start_link(Env),
{ok, Sup}.
stop(_State) ->
ok.

View File

@ -0,0 +1,35 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_st_statistics_sup).
-behaviour(supervisor).
-export([start_link/1]).
-export([init/1]).
start_link(Env) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Env]).
init([Env]) ->
{ok, {{one_for_one, 10, 3600},
[#{id => st_statistics,
start => {emqx_st_statistics, start_link, [Env]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_st_statistics]}]}}.

View File

@ -0,0 +1,110 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_st_statistics_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("include/emqx.hrl").
%-define(LOGT(Format, Args), ct:pal(Format, Args)).
-define(LOG_TAB, emqx_st_statistics_log).
-define(TOPK_TAB, emqx_st_statistics_topk).
-define(NOW, erlang:system_time(millisecond)).
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_st_statistics], fun set_special_cfg/1),
Config.
set_special_cfg(_) ->
application:set_env([{emqx_st_statistics, base_conf()}]),
ok.
end_per_suite(Config) ->
emqx_ct_helpers:stop_apps([emqx_st_statistics]),
Config.
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_log_and_pub(_) ->
%% Sub topic first
SubBase = "/test",
emqx:subscribe("$slow_topics"),
Clients = start_client(SubBase),
timer:sleep(1000),
Now = ?NOW,
%% publish
?assert(ets:info(?LOG_TAB, size) =:= 0),
lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("~s~p", [SubBase, I])),
Msg = emqx_message:make(Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 1000})
end,
lists:seq(1, 10)),
?assert(ets:info(?LOG_TAB, size) =:= 5),
timer:sleep(2400),
?assert(ets:info(?LOG_TAB, size) =:= 0),
?assert(ets:info(?TOPK_TAB, size) =:= 3),
try_receive(3),
try_receive(2),
[Client ! stop || Client <- Clients],
ok.
base_conf() ->
[{top_k_num, 3},
{threshold_time, 10},
{notice_qos, 0},
{notice_batch_size, 3},
{notice_topic,"$slow_topics"},
{time_window, 2000},
{max_log_num, 5}].
start_client(Base) ->
[spawn(fun() ->
Topic = list_to_binary(io_lib:format("~s~p", [Base, I])),
client(Topic)
end)
|| I <- lists:seq(1, 10)].
client(Topic) ->
{ok, C} = emqtt:start_link([{host, "localhost"},
{clientid, Topic},
{username, <<"plain">>},
{password, <<"plain">>}]),
{ok, _} = emqtt:connect(C),
{ok, _, _} = emqtt:subscribe(C, Topic),
receive
stop ->
ok
end.
try_receive(L) ->
receive
{deliver, _, #message{payload = Payload}} ->
#{<<"logs">> := Logs} = emqx_json:decode(Payload, [return_maps]),
?assertEqual(length(Logs), L)
after 500 ->
?assert(false)
end.

View File

@ -303,6 +303,7 @@ relx_plugin_apps(ReleaseType) ->
, emqx_recon
, emqx_rule_engine
, emqx_sasl
, emqx_st_statistics
]
++ [emqx_telemetry || not is_enterprise()]
++ relx_plugin_apps_per_rel(ReleaseType)

View File

@ -5,38 +5,64 @@ latest_release=$(git describe --abbrev=0 --tags)
bad_app_count=0
while read -r app; do
if [ "$app" != "emqx" ]; then
app_path="$app"
else
app_path="."
fi
src_file="$app_path/src/$(basename "$app").app.src"
old_app_version="$(git show "$latest_release":"$src_file" | grep vsn | grep -oE '"[0-9]+.[0-9]+.[0-9]+"' | tr -d '"')"
now_app_version=$(grep -E 'vsn' "$src_file" | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"')
if [ "$old_app_version" = "$now_app_version" ]; then
changed="$(git diff --name-only "$latest_release"...HEAD \
-- "$app_path/src" \
-- "$app_path/priv" \
-- "$app_path/c_src" | { grep -v -E 'appup\.src' || true; } | wc -l)"
if [ "$changed" -gt 0 ]; then
echo "$src_file needs a vsn bump"
bad_app_count=$(( bad_app_count + 1))
elif [[ ${app_path} = *emqx_dashboard* ]]; then
## emqx_dashboard is ensured to be upgraded after all other plugins
## at the end of its appup instructions, there is the final instruction
## {apply, {emqx_plugins, load, []}
## since we don't know which plugins are stopped during the upgrade
## for safty, we just force a dashboard version bump for each and every release
## even if there is nothing changed in the app
echo "$src_file needs a vsn bump to ensure plugins loaded after upgrade"
bad_app_count=$(( bad_app_count + 1))
get_vsn() {
commit="$1"
app_src_file="$2"
if [ "$commit" = 'HEAD' ]; then
if [ -f "$app_src_file" ]; then
grep vsn "$app_src_file" | grep -oE '"[0-9]+.[0-9]+.[0-9]+"' | tr -d '"' || true
fi
else
git show "$commit":"$app_src_file" 2>/dev/null | grep vsn | grep -oE '"[0-9]+.[0-9]+.[0-9]+"' | tr -d '"' || true
fi
done < <(./scripts/find-apps.sh)
}
if [ $bad_app_count -gt 0 ]; then
exit 1
else
echo "apps version check successfully"
fi
check_apps() {
while read -r app_path; do
app=$(basename "$app_path")
src_file="$app_path/src/$app.app.src"
old_app_version="$(get_vsn "$latest_release" "$src_file")"
## TODO: delete it after new version is released with emqx app in apps dir
if [ "$app" = 'emqx' ] && [ "$old_app_version" = '' ]; then
old_app_version="$(get_vsn "$latest_release" 'src/emqx.app.src')"
fi
now_app_version="$(get_vsn 'HEAD' "$src_file")"
## TODO: delete it after new version is released with emqx app in apps dir
if [ "$app" = 'emqx' ] && [ "$now_app_version" = '' ]; then
now_app_version="$(get_vsn 'HEAD' 'src/emqx.app.src')"
fi
if [ -z "$now_app_version" ]; then
echo "failed_to_get_new_app_vsn for $app"
exit 1
fi
if [ -z "${old_app_version:-}" ]; then
echo "skiped checking new app ${app}"
elif [ "$old_app_version" = "$now_app_version" ]; then
lines="$(git diff --name-only "$latest_release"...HEAD \
-- "$app_path/src" \
-- "$app_path/priv" \
-- "$app_path/c_src")"
if [ "$lines" != '' ]; then
echo "$src_file needs a vsn bump (old=$old_app_version)"
echo "changed: $lines"
bad_app_count=$(( bad_app_count + 1))
fi
fi
done < <(./scripts/find-apps.sh)
if [ $bad_app_count -gt 0 ]; then
exit 1
else
echo "apps version check successfully"
fi
}
_main() {
if echo "${latest_release}" |grep -oE '[0-9]+.[0-9]+.[0-9]+' > /dev/null 2>&1; then
check_apps
else
echo "skiped unstable tag: ${latest_release}"
fi
}
_main

View File

@ -319,6 +319,7 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
puback(PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
emqx:run_hook('message.publish_done', [Msg]),
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
return_with(Msg, dequeue(Session#session{inflight = Inflight1}));
{value, {_Pubrel, _Ts}} ->
@ -343,6 +344,8 @@ return_with(Msg, {ok, Publishes, Session}) ->
pubrec(PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
%% execute hook here, because message record will be replaced by pubrel
emqx:run_hook('message.publish_done', [Msg]),
Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight),
{ok, Msg, Session#session{inflight = Inflight1}};
{value, {pubrel, _Ts}} ->
@ -439,11 +442,12 @@ deliver([Msg | More], Acc, Session) ->
end.
deliver_msg(Msg = #message{qos = ?QOS_0}, Session) ->
emqx:run_hook('message.publish_done', [Msg]),
{ok, [{undefined, maybe_ack(Msg)}], Session};
deliver_msg(Msg = #message{qos = QoS}, Session =
#session{next_pkt_id = PacketId, inflight = Inflight})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
#session{next_pkt_id = PacketId, inflight = Inflight})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
case emqx_inflight:is_full(Inflight) of
true ->
Session1 = case maybe_nack(Msg) of
@ -696,4 +700,3 @@ age(Now, Ts) -> Now - Ts.
set_field(Name, Value, Session) ->
Pos = emqx_misc:index_of(Name, record_info(fields, session)),
setelement(Pos+1, Session, Value).