Merge pull request #12800 from id/0328-sync-release-56

sync release-56
This commit is contained in:
Ivan Dyachkov 2024-03-28 13:36:25 +01:00 committed by GitHub
commit 99ea63b25a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
116 changed files with 2208 additions and 647 deletions

View File

@ -24,7 +24,7 @@ jobs:
matrix:
profile:
- ['emqx', 'master', '5.3-2:1.15.7-26.2.1-2']
- ['emqx-enterprise', 'release-56', '5.3-2:1.15.7-25.3.2-2']
- ['emqx-enterprise', 'release-56', '5.3-2:1.15.7-26.2.1-2']
os:
- debian10
- ubuntu22.04

View File

@ -20,8 +20,8 @@ endif
# Dashboard version
# from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.7.0
export EMQX_EE_DASHBOARD_VERSION ?= e1.6.0-beta.5
export EMQX_DASHBOARD_VERSION ?= v1.8.0
export EMQX_EE_DASHBOARD_VERSION ?= e1.6.0
PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise

View File

@ -32,7 +32,7 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.6.0-rc.1").
-define(EMQX_RELEASE_CE, "5.6.0").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.6.0-rc.1").
-define(EMQX_RELEASE_EE, "5.6.0").

View File

@ -116,9 +116,10 @@ app_specs() ->
app_specs(_Opts = #{}).
app_specs(Opts) ->
DefaultEMQXConf = "session_persistence {enable = true, renew_streams_interval = 1s}",
ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
[
{emqx, "session_persistence = {enable = true}" ++ ExtraEMQXConf}
{emqx, DefaultEMQXConf ++ ExtraEMQXConf}
].
get_mqtt_port(Node, Type) ->
@ -132,15 +133,6 @@ wait_nodeup(Node) ->
pong = net_adm:ping(Node)
).
wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) ->
#{override_env := Env} = proplists:get_value(gen_rpc, Apps),
Port = proplists:get_value(tcp_server_port, Env),
?retry(
_Sleep0 = 500,
_Attempts0 = 50,
false = emqx_common_test_helpers:is_tcp_server_available("127.0.0.1", Port)
).
start_client(Opts0 = #{}) ->
Defaults = #{
port => 1883,

View File

@ -69,6 +69,7 @@
{emqx_resource,2}.
{emqx_retainer,1}.
{emqx_retainer,2}.
{emqx_router,1}.
{emqx_rule_engine,1}.
{emqx_shared_sub,1}.
{emqx_slow_subs,1}.

View File

@ -28,7 +28,7 @@
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.0"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.1"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.1"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},

View File

@ -86,8 +86,35 @@ supported_version(API) ->
-spec announce(node(), atom()) -> ok.
announce(Node, App) ->
{ok, Data} = file:consult(?MODULE:versions_file(App)),
{atomic, ok} = mria:transaction(?COMMON_SHARD, fun ?MODULE:announce_fun/2, [Node, Data]),
ok.
%% replicant(5.6.0) will call old core(<5.6.0) announce_fun/2 is undef on old core
%% so we just use anonymous function to update.
case mria:transaction(?COMMON_SHARD, fun ?MODULE:announce_fun/2, [Node, Data]) of
{atomic, ok} ->
ok;
{aborted, {undef, [{?MODULE, announce_fun, _, _} | _]}} ->
{atomic, ok} = mria:transaction(
?COMMON_SHARD,
fun() ->
MS = ets:fun2ms(fun(#?TAB{key = {N, API}}) when N =:= Node ->
{N, API}
end),
OldKeys = mnesia:select(?TAB, MS, write),
_ = [
mnesia:delete({?TAB, Key})
|| Key <- OldKeys
],
%% Insert new records:
_ = [
mnesia:write(#?TAB{key = {Node, API}, version = Version})
|| {API, Version} <- Data
],
%% Update maximum supported version:
_ = [update_minimum(API) || {API, _} <- Data],
ok
end
),
ok
end.
-spec versions_file(atom()) -> file:filename_all().
versions_file(App) ->

View File

@ -237,25 +237,32 @@ log_formatter(HandlerName, Conf) ->
_ ->
conf_get("formatter", Conf)
end,
TsFormat = timstamp_format(Conf),
do_formatter(
Format, CharsLimit, SingleLine, TimeOffSet, Depth
Format, CharsLimit, SingleLine, TimeOffSet, Depth, TsFormat
).
%% auto | epoch | rfc3339
timstamp_format(Conf) ->
conf_get("timestamp_format", Conf).
%% helpers
do_formatter(json, CharsLimit, SingleLine, TimeOffSet, Depth) ->
do_formatter(json, CharsLimit, SingleLine, TimeOffSet, Depth, TsFormat) ->
{emqx_logger_jsonfmt, #{
chars_limit => CharsLimit,
single_line => SingleLine,
time_offset => TimeOffSet,
depth => Depth
depth => Depth,
timestamp_format => TsFormat
}};
do_formatter(text, CharsLimit, SingleLine, TimeOffSet, Depth) ->
do_formatter(text, CharsLimit, SingleLine, TimeOffSet, Depth, TsFormat) ->
{emqx_logger_textfmt, #{
template => [time, " [", level, "] ", msg, "\n"],
template => ["[", level, "] ", msg, "\n"],
chars_limit => CharsLimit,
single_line => SingleLine,
time_offset => TimeOffSet,
depth => Depth
depth => Depth,
timestamp_format => TsFormat
}}.
%% Don't record all logger message

View File

@ -2,7 +2,7 @@
{application, emqx, [
{id, "emqx"},
{description, "EMQX Core"},
{vsn, "5.2.0"},
{vsn, "5.3.0"},
{modules, []},
{registered, []},
{applications, [

View File

@ -154,7 +154,7 @@ do_authorize(ClientInfo, Action, Topic) ->
case run_hooks('client.authorize', [ClientInfo, Action, Topic], Default) of
AuthzResult = #{result := Result} when Result == allow; Result == deny ->
From = maps:get(from, AuthzResult, unknown),
ok = log_result(ClientInfo, Topic, Action, From, Result),
ok = log_result(Topic, Action, From, Result),
emqx_hooks:run(
'client.check_authz_complete',
[ClientInfo, Action, Topic, Result, From]
@ -173,24 +173,28 @@ do_authorize(ClientInfo, Action, Topic) ->
deny
end.
log_result(#{username := Username}, Topic, Action, From, Result) ->
log_result(Topic, Action, From, Result) ->
LogMeta = fun() ->
#{
username => Username,
topic => Topic,
action => format_action(Action),
source => format_from(From)
}
end,
case Result of
allow ->
?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"});
deny ->
?SLOG_THROTTLE(
warning,
(LogMeta())#{msg => authorization_permission_denied}
)
end.
do_log_result(Action, Result, LogMeta).
do_log_result(_Action, allow, LogMeta) ->
?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"}, #{tag => "AUTHZ"});
do_log_result(?AUTHZ_PUBLISH_MATCH_MAP(_, _), deny, LogMeta) ->
%% for publish action, we do not log permission deny at warning level here
%% because it will be logged as cannot_publish_to_topic_due_to_not_authorized
?SLOG(info, (LogMeta())#{msg => "authorization_permission_denied"}, #{tag => "AUTHZ"});
do_log_result(_, deny, LogMeta) ->
?SLOG_THROTTLE(
warning,
(LogMeta())#{msg => authorization_permission_denied},
#{tag => "AUTHZ"}
).
%% @private Format authorization rules source.
format_from(default) ->

View File

@ -642,7 +642,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
msg => cannot_publish_to_topic_due_to_not_authorized,
reason => emqx_reason_codes:name(Rc)
},
#{topic => Topic}
#{topic => Topic, tag => "AUTHZ"}
),
case emqx:get_config([authorization, deny_action], ignore) of
ignore ->
@ -661,7 +661,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
msg => cannot_publish_to_topic_due_to_quota_exceeded,
reason => emqx_reason_codes:name(Rc)
},
#{topic => Topic}
#{topic => Topic, tag => "AUTHZ"}
),
case QoS of
?QOS_0 ->
@ -1166,9 +1166,11 @@ handle_call(
kick,
Channel = #channel{
conn_state = ConnState,
conninfo = #{proto_ver := ProtoVer}
conninfo = #{proto_ver := ProtoVer},
session = Session
}
) ->
emqx_session:destroy(Session),
Channel0 = maybe_publish_will_msg(kicked, Channel),
Channel1 =
case ConnState of
@ -1745,8 +1747,10 @@ fix_mountpoint(ClientInfo = #{mountpoint := MountPoint}) ->
%%--------------------------------------------------------------------
%% Set log metadata
set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
emqx_logger:set_metadata_clientid(ClientId).
set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId} = ClientInfo}) ->
Username = maps:get(username, ClientInfo, undefined),
emqx_logger:set_metadata_clientid(ClientId),
emqx_logger:set_metadata_username(Username).
%%--------------------------------------------------------------------
%% Check banned
@ -1813,6 +1817,7 @@ authenticate(
Channel
);
_ ->
log_auth_failure("bad_authentication_method"),
{error, ?RC_BAD_AUTHENTICATION_METHOD}
end.
@ -1839,6 +1844,7 @@ do_authenticate(
auth_cache = AuthCache
}};
{error, Reason} ->
log_auth_failure(Reason),
{error, emqx_reason_codes:connack_error(Reason)}
end;
do_authenticate(Credential, #channel{clientinfo = ClientInfo} = Channel) ->
@ -1846,9 +1852,20 @@ do_authenticate(Credential, #channel{clientinfo = ClientInfo} = Channel) ->
{ok, AuthResult} ->
{ok, #{}, Channel#channel{clientinfo = merge_auth_result(ClientInfo, AuthResult)}};
{error, Reason} ->
log_auth_failure(Reason),
{error, emqx_reason_codes:connack_error(Reason)}
end.
log_auth_failure(Reason) ->
?SLOG_THROTTLE(
warning,
#{
msg => authentication_failure,
reason => Reason
},
#{tag => "AUTHN"}
).
%% Merge authentication result into ClientInfo
%% Authentication result may include:
%% 1. `is_superuser': The superuser flag from various backends

View File

@ -36,8 +36,7 @@
max_size/1,
is_full/1,
is_empty/1,
window/1,
query/2
window/1
]).
-export_type([inflight/0]).
@ -139,47 +138,3 @@ size(?INFLIGHT(Tree)) ->
-spec max_size(inflight()) -> non_neg_integer().
max_size(?INFLIGHT(MaxSize, _Tree)) ->
MaxSize.
-spec query(inflight(), #{continuation => Cont, limit := L}) ->
{[{key(), term()}], #{continuation := Cont, count := C}}
when
Cont :: none | end_of_data | key(),
L :: non_neg_integer(),
C :: non_neg_integer().
query(?INFLIGHT(Tree), #{limit := Limit} = Pager) ->
Count = gb_trees:size(Tree),
ContKey = maps:get(continuation, Pager, none),
{List, NextCont} = sublist(iterator_from(ContKey, Tree), Limit),
{List, #{continuation => NextCont, count => Count}}.
iterator_from(none, Tree) ->
gb_trees:iterator(Tree);
iterator_from(ContKey, Tree) ->
It = gb_trees:iterator_from(ContKey, Tree),
case gb_trees:next(It) of
{ContKey, _Val, ItNext} -> ItNext;
_ -> It
end.
sublist(_It, 0) ->
{[], none};
sublist(It, Len) ->
{ListAcc, HasNext} = sublist(It, Len, []),
{lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}.
sublist(It, 0, Acc) ->
{Acc, gb_trees:next(It) =/= none};
sublist(It, Len, Acc) ->
case gb_trees:next(It) of
none ->
{Acc, false};
{Key, Val, ItNext} ->
sublist(ItNext, Len - 1, [{Key, Val} | Acc])
end.
next_cont(_Acc, false) ->
end_of_data;
next_cont([{LastKey, _LastVal} | _Acc], _HasNext) ->
LastKey;
next_cont([], _HasNext) ->
end_of_data.

View File

@ -43,6 +43,7 @@
-export([
set_metadata_peername/1,
set_metadata_clientid/1,
set_metadata_username/1,
set_proc_metadata/1,
set_primary_log_level/1,
set_log_handler_level/2,
@ -142,6 +143,12 @@ set_metadata_clientid(<<>>) ->
set_metadata_clientid(ClientId) ->
set_proc_metadata(#{clientid => ClientId}).
-spec set_metadata_username(emqx_types:username()) -> ok.
set_metadata_username(Username) when Username =:= undefined orelse Username =:= <<>> ->
ok;
set_metadata_username(Username) ->
set_proc_metadata(#{username => Username}).
-spec set_metadata_peername(peername_str()) -> ok.
set_metadata_peername(Peername) ->
set_proc_metadata(#{peername => Peername}).

View File

@ -285,9 +285,21 @@ json_obj_root(Data0, Config) ->
),
lists:filter(
fun({_, V}) -> V =/= undefined end,
[{time, Time}, {level, Level}, {msg, Msg}]
[{time, format_ts(Time, Config)}, {level, Level}, {msg, Msg}]
) ++ Data.
format_ts(Ts, #{timestamp_format := rfc3339, time_offset := Offset}) when is_integer(Ts) ->
iolist_to_binary(
calendar:system_time_to_rfc3339(Ts, [
{unit, microsecond},
{offset, Offset},
{time_designator, $T}
])
);
format_ts(Ts, _Config) ->
% auto | epoch
Ts.
json_obj(Data, Config) ->
maps:fold(
fun(K, V, D) ->

View File

@ -20,7 +20,7 @@
-export([check_config/1]).
-export([try_format_unicode/1]).
check_config(X) -> logger_formatter:check_config(X).
check_config(X) -> logger_formatter:check_config(maps:without([timestamp_format], X)).
%% Principle here is to delegate the formatting to logger_formatter:format/2
%% as much as possible, and only enrich the report with clientid, peername, topic, username
@ -35,7 +35,7 @@ format(#{msg := {report, ReportMap}, meta := Meta} = Event, Config) when is_map(
false ->
maps:from_list(ReportList)
end,
logger_formatter:format(Event#{msg := {report, Report}}, Config);
fmt(Event#{msg := {report, Report}}, Config);
format(#{msg := {string, String}} = Event, Config) ->
%% copied from logger_formatter:format/2
%% unsure how this case is triggered
@ -45,7 +45,23 @@ format(#{msg := Msg0, meta := Meta} = Event, Config) ->
%% and logger:log(Level, "message", #{key => value})
Msg1 = enrich_client_info(Msg0, Meta),
Msg2 = enrich_topic(Msg1, Meta),
logger_formatter:format(Event#{msg := Msg2}, Config).
fmt(Event#{msg := Msg2}, Config).
fmt(#{meta := #{time := Ts}} = Data, Config) ->
Timestamp =
case Config of
#{timestamp_format := epoch} ->
integer_to_list(Ts);
_ ->
% auto | rfc3339
TimeOffset = maps:get(time_offset, Config, ""),
calendar:system_time_to_rfc3339(Ts, [
{unit, microsecond},
{offset, TimeOffset},
{time_designator, $T}
])
end,
[Timestamp, " ", logger_formatter:format(Data, Config)].
%% Other report callbacks may only accept map() reports such as gen_server formatter
is_list_report_acceptable(#{report_cb := Cb}) ->
@ -69,7 +85,9 @@ enrich_report(ReportRaw, Meta) ->
ClientId = maps:get(clientid, Meta, undefined),
Peer = maps:get(peername, Meta, undefined),
Msg = maps:get(msg, ReportRaw, undefined),
Tag = maps:get(tag, ReportRaw, undefined),
%% TODO: move all tags to Meta so we can filter traces
%% based on tags (currently not supported)
Tag = maps:get(tag, ReportRaw, maps:get(tag, Meta, undefined)),
%% turn it into a list so that the order of the fields is determined
lists:foldl(
fun

View File

@ -98,6 +98,7 @@
-define(HIGHEST_PRIORITY, infinity).
-define(MAX_LEN_INFINITY, 0).
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
-define(INSERT_TS, mqueue_insert_ts).
-record(shift_opts, {
multiplier :: non_neg_integer(),
@ -172,54 +173,82 @@ filter(Pred, #mqueue{q = Q, len = Len, dropped = Droppend} = MQ) ->
MQ#mqueue{q = Q2, len = Len2, dropped = Droppend + Diff}
end.
-spec query(mqueue(), #{continuation => ContMsgId, limit := L}) ->
{[message()], #{continuation := ContMsgId, count := C}}
-spec query(mqueue(), #{position => Pos, limit := Limit}) ->
{[message()], #{position := Pos, start := Pos}}
when
ContMsgId :: none | end_of_data | binary(),
C :: non_neg_integer(),
L :: non_neg_integer().
query(MQ, #{limit := Limit} = Pager) ->
ContMsgId = maps:get(continuation, Pager, none),
{List, NextCont} = sublist(skip_until(MQ, ContMsgId), Limit),
{List, #{continuation => NextCont, count => len(MQ)}}.
Pos :: none | {integer(), priority()},
Limit :: non_neg_integer().
query(MQ, #{limit := Limit} = PagerParams) ->
Pos = maps:get(position, PagerParams, none),
PQsList = ?PQUEUE:to_queues_list(MQ#mqueue.q),
{Msgs, NxtPos} = sublist(skip_until(PQsList, Pos), Limit, [], Pos),
{Msgs, #{position => NxtPos, start => first_msg_pos(PQsList)}}.
skip_until(MQ, none = _MsgId) ->
MQ;
skip_until(MQ, MsgId) ->
do_skip_until(MQ, MsgId).
do_skip_until(MQ, MsgId) ->
case out(MQ) of
{empty, MQ} ->
MQ;
{{value, #message{id = MsgId}}, Q1} ->
Q1;
{{value, _Msg}, Q1} ->
do_skip_until(Q1, MsgId)
first_msg_pos([]) ->
none;
first_msg_pos([{Prio, PQ} | T]) ->
case ?PQUEUE:out(PQ) of
{empty, _PQ} ->
first_msg_pos(T);
{{value, Msg}, _Q} ->
{insert_ts(Msg), Prio}
end.
sublist(_MQ, 0) ->
{[], none};
sublist(MQ, Len) ->
{ListAcc, HasNext} = sublist(MQ, Len, []),
{lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}.
sublist(MQ, 0, Acc) ->
{Acc, element(1, out(MQ)) =/= empty};
sublist(MQ, Len, Acc) ->
case out(MQ) of
{empty, _MQ} ->
{Acc, false};
{{value, Msg}, Q1} ->
sublist(Q1, Len - 1, [Msg | Acc])
skip_until(PQsList, none = _Pos) ->
PQsList;
skip_until(PQsList, {MsgPos, PrioPos}) ->
case skip_until_prio(PQsList, PrioPos) of
[{Prio, PQ} | T] ->
PQ1 = skip_until_msg(PQ, MsgPos),
[{Prio, PQ1} | T];
[] ->
[]
end.
next_cont(_Acc, false) ->
end_of_data;
next_cont([#message{id = Id} | _Acc], _HasNext) ->
Id;
next_cont([], _HasNext) ->
end_of_data.
skip_until_prio(PQsList, PrioPos) ->
lists:dropwhile(fun({Prio, _PQ}) -> Prio > PrioPos end, PQsList).
skip_until_msg(PQ, MsgPos) ->
case ?PQUEUE:out(PQ) of
{empty, PQ1} ->
PQ1;
{{value, Msg}, PQ1} ->
case insert_ts(Msg) > MsgPos of
true -> PQ;
false -> skip_until_msg(PQ1, MsgPos)
end
end.
sublist(PQs, Len, Acc, LastPosPrio) when PQs =:= []; Len =:= 0 ->
{Acc, LastPosPrio};
sublist([{Prio, PQ} | T], Len, Acc, LastPosPrio) ->
{SingleQAcc, SingleQSize} = sublist_single_pq(Prio, PQ, Len, [], 0),
Acc1 = Acc ++ lists:reverse(SingleQAcc),
NxtPosPrio =
case SingleQAcc of
[H | _] -> {insert_ts(H), Prio};
[] -> LastPosPrio
end,
case SingleQSize =:= Len of
true ->
{Acc1, NxtPosPrio};
false ->
sublist(T, Len - SingleQSize, Acc1, NxtPosPrio)
end.
sublist_single_pq(_Prio, _PQ, 0, Acc, AccSize) ->
{Acc, AccSize};
sublist_single_pq(Prio, PQ, Len, Acc, AccSize) ->
case ?PQUEUE:out(0, PQ) of
{empty, _PQ} ->
{Acc, AccSize};
{{value, Msg}, PQ1} ->
Msg1 = with_prio(Msg, Prio),
sublist_single_pq(Prio, PQ1, Len - 1, [Msg1 | Acc], AccSize + 1)
end.
with_prio(#message{extra = Extra} = Msg, Prio) ->
Msg#message{extra = Extra#{mqueue_priority => Prio}}.
to_list(MQ, Acc) ->
case out(MQ) of
@ -256,14 +285,15 @@ in(
) ->
Priority = get_priority(Topic, PTab, Dp),
PLen = ?PQUEUE:plen(Priority, Q),
Msg1 = with_ts(Msg),
case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
true ->
%% reached max length, drop the oldest message
{{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
Q2 = ?PQUEUE:in(Msg, Priority, Q1),
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
Q2 = ?PQUEUE:in(Msg1, Priority, Q1),
{without_ts(DroppedMsg), MQ#mqueue{q = Q2, dropped = Dropped + 1}};
false ->
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg1, Priority, Q)}}
end.
-spec out(mqueue()) -> {empty | {value, message()}, mqueue()}.
@ -280,7 +310,7 @@ out(MQ = #mqueue{q = Q, len = Len, last_prio = undefined, shift_opts = ShiftOpts
last_prio = Prio,
p_credit = get_credits(Prio, ShiftOpts)
},
{{value, Val}, MQ1};
{{value, without_ts(Val)}, MQ1};
out(MQ = #mqueue{q = Q, p_credit = 0}) ->
MQ1 = MQ#mqueue{
q = ?PQUEUE:shift(Q),
@ -288,8 +318,12 @@ out(MQ = #mqueue{q = Q, p_credit = 0}) ->
},
out(MQ1);
out(MQ = #mqueue{q = Q, len = Len, p_credit = Cnt}) ->
{R, Q1} = ?PQUEUE:out(Q),
{R, MQ#mqueue{q = Q1, len = Len - 1, p_credit = Cnt - 1}}.
{R, Q2} =
case ?PQUEUE:out(Q) of
{{value, Val}, Q1} -> {{value, without_ts(Val)}, Q1};
Other -> Other
end,
{R, MQ#mqueue{q = Q2, len = Len - 1, p_credit = Cnt - 1}}.
get_opt(Key, Opts, Default) ->
case maps:get(Key, Opts, Default) of
@ -359,3 +393,23 @@ p_table(PTab = #{}) ->
);
p_table(PTab) ->
PTab.
%% This is used to sort/traverse messages in query/2
with_ts(#message{extra = Extra} = Msg) ->
TsNano = erlang:system_time(nanosecond),
Extra1 =
case is_map(Extra) of
true -> Extra;
%% extra field has not being used before EMQX 5.4.0
%% and defaulted to an empty list,
%% if it's not a map it's safe to overwrite it
false -> #{}
end,
Msg#message{extra = Extra1#{?INSERT_TS => TsNano}}.
without_ts(#message{extra = Extra} = Msg) ->
Msg#message{extra = maps:remove(?INSERT_TS, Extra)};
without_ts(Msg) ->
Msg.
insert_ts(#message{extra = #{?INSERT_TS := Ts}}) -> Ts.

View File

@ -36,7 +36,8 @@
-export([
create/4,
open/4,
destroy/1
destroy/1,
kick_offline_session/1
]).
-export([
@ -220,6 +221,15 @@ destroy(#{clientid := ClientID}) ->
destroy_session(ClientID) ->
session_drop(ClientID, destroy).
-spec kick_offline_session(emqx_types:clientid()) -> ok.
kick_offline_session(ClientID) ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
session_drop(ClientID, kicked);
false ->
ok
end.
%%--------------------------------------------------------------------
%% Info, Stats
%%--------------------------------------------------------------------
@ -292,7 +302,9 @@ info(awaiting_rel_max, #{props := Conf}) ->
info(await_rel_timeout, #{props := _Conf}) ->
%% TODO: currently this setting is ignored:
%% maps:get(await_rel_timeout, Conf).
0.
0;
info({MsgsQ, _PagerParams}, _Session) when MsgsQ =:= mqueue_msgs; MsgsQ =:= inflight_msgs ->
{error, not_implemented}.
-spec stats(session()) -> emqx_types:stats().
stats(Session) ->

View File

@ -21,11 +21,12 @@
-record(ps_route, {
topic :: binary(),
dest :: emqx_persistent_session_ds:id()
dest :: emqx_persistent_session_ds:id() | '_'
}).
-record(ps_routeidx, {
entry :: '$1' | emqx_topic_index:key(emqx_persistent_session_ds_router:dest()),
unused = [] :: nil()
unused = [] :: nil() | '_'
}).
-endif.

View File

@ -32,6 +32,12 @@
foldl_routes/2
]).
%% Topics API
-export([
stream/1,
stats/1
]).
-export([cleanup_routes/1]).
-export([print_routes/1]).
-export([topics/0]).
@ -196,6 +202,26 @@ foldl_routes(FoldFun, AccIn) ->
foldr_routes(FoldFun, AccIn) ->
fold_routes(foldr, FoldFun, AccIn).
%%--------------------------------------------------------------------
%% Topic API
%%--------------------------------------------------------------------
%% @doc Create a `emqx_utils_stream:stream(#route{})` out of the router state,
%% potentially filtered by a topic or topic filter. The stream emits `#route{}`
%% records since this is what `emqx_mgmt_api_topics` knows how to deal with.
-spec stream(_MTopic :: '_' | emqx_types:topic()) ->
emqx_utils_stream:stream(emqx_types:route()).
stream(MTopic) ->
emqx_utils_stream:chain(stream(?PS_ROUTER_TAB, MTopic), stream(?PS_FILTERS_TAB, MTopic)).
%% @doc Retrieve router stats.
%% n_routes: total number of routes, should be equal to the length of `stream('_')`.
-spec stats(n_routes) -> non_neg_integer().
stats(n_routes) ->
NTopics = ets:info(?PS_ROUTER_TAB, size),
NFilters = ets:info(?PS_FILTERS_TAB, size),
emqx_maybe:define(NTopics, 0) + emqx_maybe:define(NFilters, 0).
%%--------------------------------------------------------------------
%% Internal fns
%%--------------------------------------------------------------------
@ -225,6 +251,12 @@ get_dest_session_id({_, DSSessionId}) ->
get_dest_session_id(DSSessionId) ->
DSSessionId.
export_route(#ps_route{topic = Topic, dest = Dest}) ->
#route{topic = Topic, dest = Dest}.
export_routeidx(#ps_routeidx{entry = M}) ->
#route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
match_to_route(M) ->
#ps_route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
@ -242,3 +274,35 @@ list_route_tab_topics() ->
mria_route_tab_delete(Route) ->
mria:dirty_delete_object(?PS_ROUTER_TAB, Route).
%% @doc Create a `emqx_utils_stream:stream(#route{})` out of contents of either of
%% 2 route tables, optionally filtered by a topic or topic filter. If the latter is
%% specified, then it doesn't make sense to scan through `?PS_ROUTER_TAB` if it's
%% a wildcard topic, and vice versa for `?PS_FILTERS_TAB` if it's not, so we optimize
%% it away by returning an empty stream in those cases.
stream(Tab = ?PS_ROUTER_TAB, MTopic) ->
case MTopic == '_' orelse not emqx_topic:wildcard(MTopic) of
true ->
MatchSpec = #ps_route{topic = MTopic, _ = '_'},
mk_tab_stream(Tab, MatchSpec, fun export_route/1);
false ->
emqx_utils_stream:empty()
end;
stream(Tab = ?PS_FILTERS_TAB, MTopic) ->
case MTopic == '_' orelse emqx_topic:wildcard(MTopic) of
true ->
MatchSpec = #ps_routeidx{entry = emqx_trie_search:make_pat(MTopic, '_'), _ = '_'},
mk_tab_stream(Tab, MatchSpec, fun export_routeidx/1);
false ->
emqx_utils_stream:empty()
end.
mk_tab_stream(Tab, MatchSpec, Mapper) ->
%% NOTE: Currently relying on the fact that tables are backed by ETSes.
emqx_utils_stream:map(
Mapper,
emqx_utils_stream:ets(fun
(undefined) -> ets:match_object(Tab, MatchSpec, 1);
(Cont) -> ets:match_object(Cont)
end)
).

View File

@ -46,6 +46,7 @@
len/1,
plen/2,
to_list/1,
to_queues_list/1,
from_list/1,
in/2,
in/3,
@ -121,6 +122,18 @@ to_list({pqueue, Queues}) ->
{0, V} <- to_list(Q)
].
-spec to_queues_list(pqueue()) -> [{priority(), squeue()}].
to_queues_list({queue, _In, _Out, _Len} = Squeue) ->
[{0, Squeue}];
to_queues_list({pqueue, Queues}) ->
lists:sort(
fun
({infinity = _P1, _}, {_P2, _}) -> true;
({P1, _}, {P2, _}) -> P1 >= P2
end,
[{maybe_negate_priority(P), Q} || {P, Q} <- Queues]
).
-spec from_list([{priority(), any()}]) -> pqueue().
from_list(L) ->
lists:foldl(fun({P, E}, Q) -> in(E, P, Q) end, new(), L).

View File

@ -58,7 +58,7 @@
]).
%% Topics API
-export([select/3]).
-export([stream/1]).
-export([print_routes/1]).
@ -92,9 +92,11 @@
]).
-export_type([dest/0]).
-export_type([schemavsn/0]).
-type group() :: binary().
-type dest() :: node() | {group(), node()}.
-type schemavsn() :: v1 | v2.
%% Operation :: {add, ...} | {delete, ...}.
-type batch() :: #{batch_route() => _Operation :: tuple()}.
@ -266,18 +268,15 @@ mria_batch_v1(Batch) ->
batch_get_action(Op) ->
element(1, Op).
-spec select(Spec, _Limit :: pos_integer(), Continuation) ->
{[emqx_types:route()], Continuation} | '$end_of_table'
when
Spec :: {_TopicPat, _DestPat},
Continuation :: term() | '$end_of_table'.
select(MatchSpec, Limit, Cont) ->
select(get_schema_vsn(), MatchSpec, Limit, Cont).
-spec stream(_Spec :: {_TopicPat, _DestPat}) ->
emqx_utils_stream:stream(emqx_types:route()).
stream(MatchSpec) ->
stream(get_schema_vsn(), MatchSpec).
select(v2, MatchSpec, Limit, Cont) ->
select_v2(MatchSpec, Limit, Cont);
select(v1, MatchSpec, Limit, Cont) ->
select_v1(MatchSpec, Limit, Cont).
stream(v2, MatchSpec) ->
stream_v2(MatchSpec);
stream(v1, MatchSpec) ->
stream_v1(MatchSpec).
-spec topics() -> list(emqx_types:topic()).
topics() ->
@ -452,10 +451,8 @@ cleanup_routes_v1_fallback(Node) ->
]
end).
select_v1({MTopic, MDest}, Limit, undefined) ->
ets:match_object(?ROUTE_TAB, #route{topic = MTopic, dest = MDest}, Limit);
select_v1(_Spec, _Limit, Cont) ->
ets:select(Cont).
stream_v1(Spec) ->
mk_route_stream(?ROUTE_TAB, Spec).
list_topics_v1() ->
list_route_tab_topics().
@ -591,36 +588,27 @@ make_route_rec_pat(DestPattern) ->
[{1, route}, {#route.dest, DestPattern}]
).
select_v2(Spec, Limit, undefined) ->
Stream = mk_route_stream(Spec),
select_next(Limit, Stream);
select_v2(_Spec, Limit, Stream) ->
select_next(Limit, Stream).
select_next(N, Stream) ->
case emqx_utils_stream:consume(N, Stream) of
{Routes, SRest} ->
{Routes, SRest};
Routes ->
{Routes, '$end_of_table'}
end.
mk_route_stream(Spec) ->
stream_v2(Spec) ->
emqx_utils_stream:chain(
mk_route_stream(route, Spec),
mk_route_stream(filter, Spec)
mk_route_stream(?ROUTE_TAB, Spec),
mk_route_stream(?ROUTE_TAB_FILTERS, Spec)
).
mk_route_stream(route, Spec) ->
emqx_utils_stream:ets(fun(Cont) -> select_v1(Spec, 1, Cont) end);
mk_route_stream(filter, {MTopic, MDest}) ->
mk_route_stream(Tab = ?ROUTE_TAB, {MTopic, MDest}) ->
emqx_utils_stream:ets(fun
(undefined) ->
ets:match_object(Tab, #route{topic = MTopic, dest = MDest}, 1);
(Cont) ->
ets:match_object(Cont)
end);
mk_route_stream(Tab = ?ROUTE_TAB_FILTERS, {MTopic, MDest}) ->
emqx_utils_stream:map(
fun routeidx_to_route/1,
emqx_utils_stream:ets(
fun
(undefined) ->
MatchSpec = #routeidx{entry = emqx_trie_search:make_pat(MTopic, MDest)},
ets:match_object(?ROUTE_TAB_FILTERS, MatchSpec, 1);
ets:match_object(Tab, MatchSpec, 1);
(Cont) ->
ets:match_object(Cont)
end
@ -657,8 +645,8 @@ match_to_route(M) ->
-define(PT_SCHEMA_VSN, {?MODULE, schemavsn}).
-type schemavsn() :: v1 | v2.
%% @doc Get the schema version in use.
%% BPAPI RPC Target @ emqx_router_proto
-spec get_schema_vsn() -> schemavsn().
get_schema_vsn() ->
persistent_term:get(?PT_SCHEMA_VSN).
@ -668,23 +656,23 @@ init_schema() ->
ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]),
ok = emqx_trie:wait_for_tables(),
ConfSchema = emqx_config:get([broker, routing, storage_schema]),
Schema = choose_schema_vsn(ConfSchema),
{ClusterSchema, ClusterState} = discover_cluster_schema_vsn(),
Schema = choose_schema_vsn(ConfSchema, ClusterSchema, ClusterState),
ok = persistent_term:put(?PT_SCHEMA_VSN, Schema),
case Schema of
ConfSchema ->
case Schema =:= ConfSchema of
true ->
?SLOG(info, #{
msg => "routing_schema_used",
schema => Schema
});
_ ->
false ->
?SLOG(notice, #{
msg => "configured_routing_schema_ignored",
schema_in_use => Schema,
configured => ConfSchema,
reason =>
"Could not use configured routing storage schema because "
"there are already non-empty routing tables pertaining to "
"another schema."
"cluster is already running with a different schema."
})
end.
@ -693,34 +681,147 @@ deinit_schema() ->
_ = persistent_term:erase(?PT_SCHEMA_VSN),
ok.
-spec choose_schema_vsn(schemavsn()) -> schemavsn().
choose_schema_vsn(ConfType) ->
IsEmptyIndex = emqx_trie:empty(),
IsEmptyFilters = is_empty(?ROUTE_TAB_FILTERS),
case {IsEmptyIndex, IsEmptyFilters} of
{true, true} ->
ConfType;
{false, true} ->
v1;
{true, false} ->
v2;
{false, false} ->
?SLOG(critical, #{
msg => "conflicting_routing_schemas_detected_in_cluster",
configured => ConfType,
-spec discover_cluster_schema_vsn() ->
{schemavsn() | undefined, _State :: [{node(), schemavsn() | undefined, _Details}]}.
discover_cluster_schema_vsn() ->
discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]).
-spec discover_cluster_schema_vsn([node()]) ->
{schemavsn() | undefined, _State :: [{node(), schemavsn() | undefined, _Details}]}.
discover_cluster_schema_vsn([]) ->
%% single node
{undefined, []};
discover_cluster_schema_vsn(Nodes) ->
Responses = lists:zipwith(
fun
(Node, {ok, Schema}) ->
{Node, Schema, configured};
(Node, {error, {exception, undef, _Stacktrace}}) ->
%% No such function on the remote node, assuming it doesn't know about v2 routing.
{Node, v1, legacy};
(Node, {error, {exception, badarg, _Stacktrace}}) ->
%% Likely, persistent term is not defined yet.
{Node, unknown, starting};
(Node, Error) ->
{Node, unknown, Error}
end,
Nodes,
emqx_router_proto_v1:get_routing_schema_vsn(Nodes)
),
case lists:usort([Vsn || {_Node, Vsn, _} <- Responses, Vsn /= unknown]) of
[Vsn] when Vsn =:= v1; Vsn =:= v2 ->
{Vsn, Responses};
[] ->
?SLOG(warning, #{
msg => "cluster_routing_schema_discovery_failed",
responses => Responses,
reason =>
"There are records in the routing tables related to both v1 "
"and v2 storage schemas. This probably means that some nodes "
"in the cluster use v1 schema and some use v2, independently "
"of each other. The routing is likely broken. Manual intervention "
"and full cluster restart is required. This node will shut down."
"Could not determine configured routing storage schema in peer nodes."
}),
{undefined, Responses};
[_ | _] ->
Desc = schema_conflict_reason(config, Responses),
io:format(standard_error, "Error: ~ts~n", [Desc]),
?SLOG(critical, #{
msg => "conflicting_routing_schemas_in_cluster",
responses => Responses,
description => Desc
}),
error(conflicting_routing_schemas_configured_in_cluster)
end.
-spec choose_schema_vsn(
schemavsn(),
_ClusterSchema :: schemavsn() | undefined,
_ClusterState :: [{node(), schemavsn() | undefined, _Details}]
) -> schemavsn().
choose_schema_vsn(ConfSchema, ClusterSchema, State) ->
case detect_table_schema_vsn() of
[] ->
%% No records in the tables, use schema configured in the cluster if any,
%% otherwise use configured.
emqx_maybe:define(ClusterSchema, ConfSchema);
[Schema] when Schema =:= ClusterSchema ->
%% Table contents match configured schema in the cluster.
Schema;
[Schema] when ClusterSchema =:= undefined ->
%% There are existing records following some schema, we have to use it.
Schema;
_Conflicting when ClusterSchema =/= undefined ->
%% There are existing records in both v1 and v2 schema,
%% we have to use what the peer nodes agreed on.
%% because it could be THIS node which caused the cnoflict.
%%
%% The stale records will be left-over, but harmless
Desc =
"Conflicting schema version detected for routing records, but "
"all the peer nodes are running the same version, so this node "
"will use the same schema but discard the harmless stale records. "
"This warning will go away after the next full cluster (non-rolling) restart.",
?SLOG(warning, #{
msg => "conflicting_routing_storage_detected",
resolved => ClusterSchema,
description => Desc
}),
ClusterSchema;
_Conflicting ->
Desc = schema_conflict_reason(records, State),
io:format(standard_error, "Error: ~ts~n", [Desc]),
?SLOG(critical, #{
msg => "conflicting_routing_storage_in_cluster",
description => Desc
}),
error(conflicting_routing_schemas_detected_in_cluster)
end.
schema_conflict_reason(Type, State) ->
Observe =
case Type of
config ->
"Peer nodes have route storage schema resolved into conflicting versions.\n";
records ->
"There are conflicting routing records found.\n"
end,
Cause =
"\nThis was caused by a race-condition when the cluster was rolling upgraded "
"from an older version to 5.4.0, 5.4.1, 5.5.0 or 5.5.1."
"\nThis node cannot boot before the conflicts are resolved.\n",
Observe ++ Cause ++ mk_conflict_resolution_action(State).
detect_table_schema_vsn() ->
lists:flatten([
[v1 || _NonEmptyTrieIndex = not emqx_trie:empty()],
[v2 || _NonEmptyFilterTab = not is_empty(?ROUTE_TAB_FILTERS)]
]).
is_empty(Tab) ->
ets:first(Tab) =:= '$end_of_table'.
mk_conflict_resolution_action(State) ->
NodesV1 = [Node || {Node, v1, _} <- State],
NodesUnknown = [Node || {Node, unknown, _} <- State],
Format =
"There are two ways to resolve the conflict:"
"\n"
"\nA: Full cluster restart: stop ALL running nodes one by one "
"and restart them in the reversed order."
"\n"
"\nB: Force v1 nodes to clean up their routes."
"\n Following EMQX nodes are running with v1 schema: ~0p."
"\n 1. Stop listeners with command \"emqx eval 'emqx_listener:stop()'\" in all v1 nodes"
"\n 2. Wait until they are safe to restart."
"\n This could take some time, depending on the number of clients and their subscriptions."
"\n Below conditions should be true for each of the nodes in order to proceed:"
"\n a) Command 'ets:info(emqx_subscriber, size)' prints `0`."
"\n b) Command 'emqx ctl topics list' prints No topics.`"
"\n 3. Upgrade the nodes to 5.6.0 or newer.",
FormatUnkown =
"Additionally, the following nodes were unreachable during startup: ~0p."
"It is strongly advised to include them in the manual resolution procedure as well.",
Message = io_lib:format(Format, [NodesV1]),
MessageUnknown = [io_lib:format(FormatUnkown, [NodesUnknown]) || NodesUnknown =/= []],
unicode:characters_to_list([Message, "\n", MessageUnknown]).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------

View File

@ -154,6 +154,8 @@
-define(DEFAULT_BATCH_N, 1000).
-define(INFLIGHT_INSERT_TS, inflight_insert_ts).
%%--------------------------------------------------------------------
%% Init a Session
%%--------------------------------------------------------------------
@ -280,8 +282,7 @@ info(inflight_cnt, #session{inflight = Inflight}) ->
info(inflight_max, #session{inflight = Inflight}) ->
emqx_inflight:max_size(Inflight);
info({inflight_msgs, PagerParams}, #session{inflight = Inflight}) ->
{InflightList, Meta} = emqx_inflight:query(Inflight, PagerParams),
{[I#inflight_data.message || {_, I} <- InflightList], Meta};
inflight_query(Inflight, PagerParams);
info(retry_interval, #session{retry_interval = Interval}) ->
Interval;
info(mqueue, #session{mqueue = MQueue}) ->
@ -407,7 +408,7 @@ puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
Session1 = Session#session{inflight = Inflight1},
{ok, Replies, Session2} = dequeue(ClientInfo, Session1),
{ok, Msg, Replies, Session2};
{ok, without_inflight_insert_ts(Msg), Replies, Session2};
{value, _} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
@ -426,7 +427,7 @@ pubrec(PacketId, Session = #session{inflight = Inflight}) ->
{value, #inflight_data{phase = wait_ack, message = Msg} = Data} ->
Update = Data#inflight_data{phase = wait_comp},
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
{ok, Msg, Session#session{inflight = Inflight1}};
{ok, without_inflight_insert_ts(Msg), Session#session{inflight = Inflight1}};
{value, _} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
@ -462,7 +463,7 @@ pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
Session1 = Session#session{inflight = Inflight1},
{ok, Replies, Session2} = dequeue(ClientInfo, Session1),
{ok, Msg, Replies, Session2};
{ok, without_inflight_insert_ts(Msg), Replies, Session2};
{value, _Other} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
@ -650,7 +651,7 @@ do_retry_delivery(
Msg1 = emqx_message:set_flag(dup, true, Msg),
Update = Data#inflight_data{message = Msg1, timestamp = Now},
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
{[{PacketId, Msg1} | Acc], Inflight1}
{[{PacketId, without_inflight_insert_ts(Msg1)} | Acc], Inflight1}
end;
do_retry_delivery(_ClientInfo, PacketId, Data, Now, Acc, Inflight) ->
Update = Data#inflight_data{timestamp = Now},
@ -739,7 +740,7 @@ replay(ClientInfo, Session) ->
({PacketId, #inflight_data{phase = wait_comp}}) ->
{pubrel, PacketId};
({PacketId, #inflight_data{message = Msg}}) ->
{PacketId, emqx_message:set_flag(dup, true, Msg)}
{PacketId, without_inflight_insert_ts(emqx_message:set_flag(dup, true, Msg))}
end,
emqx_inflight:to_list(Session#session.inflight)
),
@ -786,7 +787,7 @@ redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
%% If the Client's Session terminates before the Client reconnects,
%% the Server MUST NOT send the Application Message to any other
%% subscribed Client [MQTT-4.8.2-5].
{true, Msg};
{true, without_inflight_insert_ts(Msg)};
({_PacketId, #inflight_data{}}) ->
false
end,
@ -822,22 +823,83 @@ publish_will_message_now(#session{} = Session, #message{} = WillMsg) ->
%% Helper functions
%%--------------------------------------------------------------------
-compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}).
-compile(
{inline, [
sort_fun/2, batch_n/1, inflight_insert_ts/1, without_inflight_insert_ts/1, with_ts/1, age/2
]}
).
sort_fun({_, A}, {_, B}) ->
A#inflight_data.timestamp =< B#inflight_data.timestamp.
query_sort_fun({_, #inflight_data{message = A}}, {_, #inflight_data{message = B}}) ->
inflight_insert_ts(A) =< inflight_insert_ts(B).
-spec inflight_query(emqx_inflight:inflight(), #{
position => integer() | none, limit := pos_integer()
}) ->
{[emqx_types:message()], #{position := integer() | none, start := integer() | none}}.
inflight_query(Inflight, #{limit := Limit} = PagerParams) ->
InflightL = emqx_inflight:to_list(fun query_sort_fun/2, Inflight),
StartPos =
case InflightL of
[{_, #inflight_data{message = FirstM}} | _] -> inflight_insert_ts(FirstM);
[] -> none
end,
Position = maps:get(position, PagerParams, none),
InflightMsgs = sublist_from_pos(InflightL, Position, Limit),
NextPos =
case InflightMsgs of
[_ | _] = L ->
inflight_insert_ts(lists:last(L));
[] ->
Position
end,
{InflightMsgs, #{start => StartPos, position => NextPos}}.
sublist_from_pos(InflightList, none = _Position, Limit) ->
inflight_msgs_sublist(InflightList, Limit);
sublist_from_pos(InflightList, Position, Limit) ->
Inflight = lists:dropwhile(
fun({_, #inflight_data{message = M}}) ->
inflight_insert_ts(M) =< Position
end,
InflightList
),
inflight_msgs_sublist(Inflight, Limit).
%% Small optimization to get sublist and drop keys in one traversal
inflight_msgs_sublist([{_Key, #inflight_data{message = Msg}} | T], Limit) when Limit > 0 ->
[Msg | inflight_msgs_sublist(T, Limit - 1)];
inflight_msgs_sublist(_, _) ->
[].
inflight_insert_ts(#message{extra = #{?INFLIGHT_INSERT_TS := Ts}}) -> Ts.
without_inflight_insert_ts(#message{extra = Extra} = Msg) ->
Msg#message{extra = maps:remove(?INFLIGHT_INSERT_TS, Extra)}.
batch_n(Inflight) ->
case emqx_inflight:max_size(Inflight) of
0 -> ?DEFAULT_BATCH_N;
Sz -> Sz - emqx_inflight:size(Inflight)
end.
with_ts(Msg) ->
with_ts(#message{extra = Extra} = Msg) ->
InsertTsNano = erlang:system_time(nanosecond),
%% This is used to sort/traverse messages in inflight_query/2
Extra1 =
case is_map(Extra) of
true -> Extra;
%% extra field has not being used before EMQX 5.4.0 and defaulted to an empty list,
%% if it's not a map it's safe to overwrite it
false -> #{}
end,
Msg1 = Msg#message{extra = Extra1#{?INFLIGHT_INSERT_TS => InsertTsNano}},
#inflight_data{
phase = wait_ack,
message = Msg,
timestamp = erlang:system_time(millisecond)
message = Msg1,
timestamp = erlang:convert_time_unit(InsertTsNano, nanosecond, millisecond)
}.
age(Now, Ts) -> Now - Ts.

View File

@ -169,7 +169,9 @@ filters(#{type := ip_address, filter := Filter, name := Name}) ->
formatter(#{type := _Type, payload_encode := PayloadEncode}) ->
{emqx_trace_formatter, #{
%% template is for ?SLOG message not ?TRACE.
template => [time, " [", level, "] ", msg, "\n"],
%% XXX: Don't need to print the time field in logger_formatter due to we manually concat it
%% in emqx_logger_textfmt:fmt/2
template => ["[", level, "] ", msg, "\n"],
single_line => true,
max_size => unlimited,
depth => unlimited,

View File

@ -0,0 +1,37 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_router_proto_v1).
-behaviour(emqx_bpapi).
-export([introduced_in/0]).
-export([
get_routing_schema_vsn/1
]).
-include_lib("emqx/include/bpapi.hrl").
-define(TIMEOUT, 3_000).
introduced_in() ->
"5.6.0".
-spec get_routing_schema_vsn([node()]) ->
[emqx_rpc:erpc(emqx_router:schemavsn())].
get_routing_schema_vsn(Nodes) ->
erpc:multicall(Nodes, emqx_router, get_schema_vsn, [], ?TIMEOUT).

View File

@ -59,7 +59,8 @@
-define(FORCE_DELETED_APIS, [
{emqx_statsd, 1},
{emqx_plugin_libs, 1},
{emqx_persistent_session, 1}
{emqx_persistent_session, 1},
{emqx_ds, 3}
]).
%% List of known RPC backend modules:
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
@ -80,6 +81,12 @@
"emqx_mgmt_api:do_query/2, emqx_mgmt_api:collect_total_from_tail_nodes/2"
).
%% Only the APIs for the features that haven't reached General
%% Availability can be added here:
-define(EXPERIMENTAL_APIS, [
{emqx_ds, 4}
]).
-define(XREF, myxref).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -110,7 +117,7 @@ check_compat(DumpFilenames) ->
Dumps = lists:map(
fun(FN) ->
{ok, [Dump]} = file:consult(FN),
Dump
Dump#{release => filename:basename(FN)}
end,
DumpFilenames
),
@ -119,48 +126,58 @@ check_compat(DumpFilenames) ->
%% Note: sets nok flag
-spec check_compat(fulldump(), fulldump()) -> ok.
check_compat(Dump1 = #{release := Rel1}, Dump2 = #{release := Rel2}) ->
check_compat(Dump1 = #{release := Rel1}, Dump2 = #{release := Rel2}) when Rel2 >= Rel1 ->
check_api_immutability(Dump1, Dump2),
Rel2 >= Rel1 andalso
typecheck_apis(Dump1, Dump2).
typecheck_apis(Dump1, Dump2);
check_compat(_, _) ->
ok.
%% It's not allowed to change BPAPI modules. Check that no changes
%% have been made. (sets nok flag)
-spec check_api_immutability(fulldump(), fulldump()) -> ok.
check_api_immutability(#{release := Rel1, api := APIs1}, #{release := Rel2, api := APIs2}) when
Rel2 >= Rel1
->
check_api_immutability(#{release := Rel1, api := APIs1}, #{release := Rel2, api := APIs2}) ->
%% TODO: Handle API deprecation
_ = maps:map(
fun(Key = {API, Version}, Val) ->
case maps:get(Key, APIs2, undefined) of
Val ->
fun(Key, Val) ->
case lists:member(Key, ?EXPERIMENTAL_APIS) of
true ->
ok;
undefined ->
case lists:member({API, Version}, ?FORCE_DELETED_APIS) of
true ->
ok;
false ->
setnok(),
logger:error(
"API ~p v~p was removed in release ~p without being deprecated.",
[API, Version, Rel2]
)
end;
_Val ->
setnok(),
logger:error(
"API ~p v~p was changed between ~p and ~p. Backplane API should be immutable.",
[API, Version, Rel1, Rel2]
)
false ->
do_check_api_immutability(Rel1, Rel2, APIs2, Key, Val)
end
end,
APIs1
),
ok;
check_api_immutability(_, _) ->
ok.
do_check_api_immutability(Rel1, Rel2, APIs2, Key = {API, Version}, Val) ->
case maps:get(Key, APIs2, undefined) of
Val ->
ok;
undefined ->
case lists:member(Key, ?FORCE_DELETED_APIS) of
true ->
ok;
false ->
setnok(),
logger:error(
"API ~p v~p was removed in release ~p without being deprecated. "
"Old release: ~p",
[API, Version, Rel2, Rel1]
)
end;
OldVal ->
setnok(),
logger:error(
"API ~p v~p was changed between ~p and ~p. Backplane API should be immutable.",
[API, Version, Rel1, Rel2]
),
D21 = maps:get(calls, Val) -- maps:get(calls, OldVal),
D12 = maps:get(calls, OldVal) -- maps:get(calls, Val),
logger:error("Added calls:~n ~p", [D21]),
logger:error("Removed calls:~n ~p", [D12])
end.
filter_calls(Calls) ->
F = fun({{Mf, _, _}, {Mt, _, _}}) ->
(not lists:member(Mf, ?FORCE_DELETED_MODULES)) andalso
@ -181,8 +198,8 @@ typecheck_apis(
AllCalls = filter_calls(AllCalls0),
lists:foreach(
fun({From, To}) ->
Caller = get_param_types(CallerSigs, From),
Callee = get_param_types(CalleeSigs, To),
Caller = get_param_types(CallerSigs, From, From),
Callee = get_param_types(CalleeSigs, From, To),
%% TODO: check return types
case typecheck_rpc(Caller, Callee) of
[] ->
@ -226,8 +243,8 @@ typecheck_rpc(Caller, Callee) ->
Callee
).
-spec get_param_types(dialyzer_dump(), emqx_bpapi:call()) -> param_types().
get_param_types(Signatures, {M, F, A}) ->
%%-spec get_param_types(dialyzer_dump(), emqx_bpapi:call()) -> param_types().
get_param_types(Signatures, From, {M, F, A}) ->
Arity = length(A),
case Signatures of
#{{M, F, Arity} := {_RetType, AttrTypes}} ->
@ -235,7 +252,7 @@ get_param_types(Signatures, {M, F, A}) ->
Arity = length(AttrTypes),
maps:from_list(lists:zip(A, AttrTypes));
_ ->
logger:critical("Call ~p:~p/~p is not found in PLT~n", [M, F, Arity]),
logger:critical("Call ~p:~p/~p from ~p is not found in PLT~n", [M, F, Arity, From]),
error({badkey, {M, F, A}})
end.

View File

@ -126,73 +126,3 @@ t_to_list(_) ->
),
ExpList = [{Seq, integer_to_binary(Seq)} || Seq <- lists:seq(1, 10)],
?assertEqual(ExpList, emqx_inflight:to_list(Inflight)).
t_query(_) ->
EmptyInflight = emqx_inflight:new(500),
?assertMatch(
{[], #{continuation := end_of_data}}, emqx_inflight:query(EmptyInflight, #{limit => 50})
),
?assertMatch(
{[], #{continuation := end_of_data}},
emqx_inflight:query(EmptyInflight, #{continuation => <<"empty">>, limit => 50})
),
?assertMatch(
{[], #{continuation := end_of_data}},
emqx_inflight:query(EmptyInflight, #{continuation => none, limit => 50})
),
Inflight = lists:foldl(
fun(Seq, QAcc) ->
emqx_inflight:insert(Seq, integer_to_binary(Seq), QAcc)
end,
EmptyInflight,
lists:reverse(lists:seq(1, 114))
),
LastCont = lists:foldl(
fun(PageSeq, Cont) ->
Limit = 10,
PagerParams = #{continuation => Cont, limit => Limit},
{Page, #{continuation := NextCont} = Meta} = emqx_inflight:query(Inflight, PagerParams),
?assertEqual(10, length(Page)),
ExpFirst = PageSeq * Limit - Limit + 1,
ExpLast = PageSeq * Limit,
?assertEqual({ExpFirst, integer_to_binary(ExpFirst)}, lists:nth(1, Page)),
?assertEqual({ExpLast, integer_to_binary(ExpLast)}, lists:nth(10, Page)),
?assertMatch(
#{count := 114, continuation := IntCont} when is_integer(IntCont),
Meta
),
NextCont
end,
none,
lists:seq(1, 11)
),
{LastPartialPage, LastMeta} = emqx_inflight:query(Inflight, #{
continuation => LastCont, limit => 10
}),
?assertEqual(4, length(LastPartialPage)),
?assertEqual({111, <<"111">>}, lists:nth(1, LastPartialPage)),
?assertEqual({114, <<"114">>}, lists:nth(4, LastPartialPage)),
?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta),
?assertMatch(
{[], #{continuation := end_of_data}},
emqx_inflight:query(Inflight, #{continuation => <<"not-existing-cont-id">>, limit => 10})
),
{LargePage, LargeMeta} = emqx_inflight:query(Inflight, #{limit => 1000}),
?assertEqual(114, length(LargePage)),
?assertEqual({1, <<"1">>}, hd(LargePage)),
?assertEqual({114, <<"114">>}, lists:last(LargePage)),
?assertMatch(#{continuation := end_of_data}, LargeMeta),
{FullPage, FullMeta} = emqx_inflight:query(Inflight, #{limit => 114}),
?assertEqual(114, length(FullPage)),
?assertEqual({1, <<"1">>}, hd(FullPage)),
?assertEqual({114, <<"114">>}, lists:last(FullPage)),
?assertMatch(#{continuation := end_of_data}, FullMeta),
{EmptyPage, EmptyMeta} = emqx_inflight:query(Inflight, #{limit => 0}),
?assertEqual([], EmptyPage),
?assertMatch(#{continuation := none, count := 114}, EmptyMeta).

View File

@ -284,13 +284,15 @@ t_dropped(_) ->
t_query(_) ->
EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true}),
?assertMatch({[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{limit => 50})),
?assertMatch(
{[], #{continuation := end_of_data}},
?Q:query(EmptyQ, #{continuation => <<"empty">>, limit => 50})
?assertEqual({[], #{position => none, start => none}}, ?Q:query(EmptyQ, #{limit => 50})),
RandPos = {erlang:system_time(nanosecond), 0},
?assertEqual(
{[], #{position => RandPos, start => none}},
?Q:query(EmptyQ, #{position => RandPos, limit => 50})
),
?assertMatch(
{[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{continuation => none, limit => 50})
?assertEqual(
{[], #{position => none, start => none}},
?Q:query(EmptyQ, #{continuation => none, limit => 50})
),
Q = lists:foldl(
@ -303,52 +305,146 @@ t_query(_) ->
lists:seq(1, 114)
),
LastCont = lists:foldl(
fun(PageSeq, Cont) ->
{LastPos, LastStart} = lists:foldl(
fun(PageSeq, {Pos, PrevStart}) ->
Limit = 10,
PagerParams = #{continuation => Cont, limit => Limit},
{Page, #{continuation := NextCont} = Meta} = ?Q:query(Q, PagerParams),
PagerParams = #{position => Pos, limit => Limit},
{Page, #{position := NextPos, start := Start}} = ?Q:query(Q, PagerParams),
?assertEqual(10, length(Page)),
ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
ExpLastPayload = integer_to_binary(PageSeq * Limit),
?assertEqual(
ExpFirstPayload,
emqx_message:payload(lists:nth(1, Page)),
#{page_seq => PageSeq, page => Page, meta => Meta}
),
?assertEqual(ExpLastPayload, emqx_message:payload(lists:nth(10, Page))),
?assertMatch(#{count := 114, continuation := <<_/binary>>}, Meta),
NextCont
FirstMsg = lists:nth(1, Page),
LastMsg = lists:nth(10, Page),
?assertEqual(ExpFirstPayload, emqx_message:payload(FirstMsg)),
?assertEqual(ExpLastPayload, emqx_message:payload(LastMsg)),
%% start value must not change as Mqueue is not modified during traversal
NextStart =
case PageSeq of
1 ->
?assertEqual({mqueue_ts(FirstMsg), 0}, Start),
Start;
_ ->
?assertEqual(PrevStart, Start),
PrevStart
end,
{NextPos, NextStart}
end,
none,
{none, none},
lists:seq(1, 11)
),
{LastPartialPage, LastMeta} = ?Q:query(Q, #{continuation => LastCont, limit => 10}),
{LastPartialPage, #{position := FinalPos} = LastMeta} = ?Q:query(Q, #{
position => LastPos, limit => 10
}),
LastMsg = lists:nth(4, LastPartialPage),
?assertEqual(4, length(LastPartialPage)),
?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:nth(4, LastPartialPage))),
?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta),
?assertMatch(
{[], #{continuation := end_of_data}},
?Q:query(Q, #{continuation => <<"not-existing-cont-id">>, limit => 10})
?assertEqual(<<"114">>, emqx_message:payload(LastMsg)),
?assertEqual(#{position => {mqueue_ts(LastMsg), 0}, start => LastStart}, LastMeta),
?assertEqual(
{[], #{start => LastStart, position => FinalPos}},
?Q:query(Q, #{position => FinalPos, limit => 10})
),
{LargePage, LargeMeta} = ?Q:query(Q, #{limit => 1000}),
{LargePage, LargeMeta} = ?Q:query(Q, #{position => none, limit => 1000}),
?assertEqual(114, length(LargePage)),
?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))),
?assertMatch(#{continuation := end_of_data}, LargeMeta),
?assertEqual(#{start => LastStart, position => FinalPos}, LargeMeta),
{FullPage, FullMeta} = ?Q:query(Q, #{limit => 114}),
?assertEqual(114, length(FullPage)),
?assertEqual(<<"1">>, emqx_message:payload(hd(FullPage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:last(FullPage))),
?assertMatch(#{continuation := end_of_data}, FullMeta),
{FullPage, FullMeta} = ?Q:query(Q, #{position => none, limit => 114}),
?assertEqual(LargePage, FullPage),
?assertEqual(LargeMeta, FullMeta),
{EmptyPage, EmptyMeta} = ?Q:query(Q, #{limit => 0}),
?assertEqual([], EmptyPage),
?assertMatch(#{continuation := none, count := 114}, EmptyMeta).
{_, Q1} = emqx_mqueue:out(Q),
{PageAfterRemove, #{start := StartAfterRemove}} = ?Q:query(Q1, #{position => none, limit => 10}),
?assertEqual(<<"2">>, emqx_message:payload(hd(PageAfterRemove))),
?assertEqual(StartAfterRemove, {mqueue_ts(hd(PageAfterRemove)), 0}).
t_query_with_priorities(_) ->
Priorities = #{<<"t/infinity">> => infinity, <<"t/10">> => 10, <<"t/5">> => 5},
EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true, priorities => Priorities}),
?assertEqual({[], #{position => none, start => none}}, ?Q:query(EmptyQ, #{limit => 50})),
RandPos = {erlang:system_time(nanosecond), 0},
?assertEqual(
{[], #{position => RandPos, start => none}},
?Q:query(EmptyQ, #{position => RandPos, limit => 50})
),
?assertEqual(
{[], #{position => none, start => none}},
?Q:query(EmptyQ, #{continuation => none, limit => 50})
),
{Q, ExpMsgsAcc} = lists:foldl(
fun(Topic, {QAcc, MsgsAcc}) ->
{TopicQ, TopicMsgs} =
lists:foldl(
fun(Seq, {TopicQAcc, TopicMsgsAcc}) ->
Payload = <<Topic/binary, "_", (integer_to_binary(Seq))/binary>>,
Msg = emqx_message:make(Topic, Payload),
{_, TopicQAcc1} = ?Q:in(Msg, TopicQAcc),
{TopicQAcc1, [Msg | TopicMsgsAcc]}
end,
{QAcc, []},
lists:seq(1, 10)
),
{TopicQ, [lists:reverse(TopicMsgs) | MsgsAcc]}
end,
{EmptyQ, []},
[<<"t/test">>, <<"t/5">>, <<"t/infinity">>, <<"t/10">>]
),
%% Manual resorting from the highest to the lowest priority
[ExpMsgsPrio0, ExpMsgsPrio5, ExpMsgsPrioInf, ExpMsgsPrio10] = lists:reverse(ExpMsgsAcc),
ExpMsgs = ExpMsgsPrioInf ++ ExpMsgsPrio10 ++ ExpMsgsPrio5 ++ ExpMsgsPrio0,
{AllMsgs, #{start := StartPos, position := Pos}} = ?Q:query(Q, #{position => none, limit => 40}),
?assertEqual(40, length(AllMsgs)),
?assertEqual(ExpMsgs, with_empty_extra(AllMsgs)),
FirstMsg = hd(AllMsgs),
LastMsg = lists:last(AllMsgs),
?assertEqual(<<"t/infinity_1">>, emqx_message:payload(FirstMsg)),
?assertEqual(StartPos, {mqueue_ts(FirstMsg), infinity}),
?assertEqual(<<"t/test_10">>, emqx_message:payload(LastMsg)),
?assertMatch({_, 0}, Pos),
?assertEqual(Pos, {mqueue_ts(LastMsg), mqueue_prio(LastMsg)}),
Pos5 = {mqueue_ts(lists:nth(5, AllMsgs)), mqueue_prio(lists:nth(5, AllMsgs))},
LastInfPos = {mqueue_ts(lists:nth(10, AllMsgs)), mqueue_prio(lists:nth(5, AllMsgs))},
{MsgsPrioInfTo10, #{start := StartPos, position := PosPrio10Msg5}} = ?Q:query(Q, #{
position => Pos5, limit => 10
}),
?assertEqual(10, length(MsgsPrioInfTo10)),
?assertEqual(<<"t/infinity_6">>, emqx_message:payload(hd(MsgsPrioInfTo10))),
?assertEqual(<<"t/10_5">>, emqx_message:payload(lists:last(MsgsPrioInfTo10))),
?assertEqual(PosPrio10Msg5, {
mqueue_ts(lists:last(MsgsPrioInfTo10)), mqueue_prio(lists:last(MsgsPrioInfTo10))
}),
{MsgsPrioInfTo5, #{start := StartPos, position := PosPrio5Msg5}} = ?Q:query(Q, #{
position => Pos5, limit => 20
}),
?assertEqual(20, length(MsgsPrioInfTo5)),
?assertEqual(<<"t/infinity_6">>, emqx_message:payload(hd(MsgsPrioInfTo5))),
?assertEqual(<<"t/5_5">>, emqx_message:payload(lists:last(MsgsPrioInfTo5))),
?assertEqual(PosPrio5Msg5, {
mqueue_ts(lists:last(MsgsPrioInfTo5)), mqueue_prio(lists:last(MsgsPrioInfTo5))
}),
{MsgsPrio10, #{start := StartPos, position := PosPrio10}} = ?Q:query(Q, #{
position => LastInfPos, limit => 10
}),
?assertEqual(ExpMsgsPrio10, with_empty_extra(MsgsPrio10)),
?assertEqual(10, length(MsgsPrio10)),
?assertEqual(<<"t/10_1">>, emqx_message:payload(hd(MsgsPrio10))),
?assertEqual(<<"t/10_10">>, emqx_message:payload(lists:last(MsgsPrio10))),
?assertEqual(PosPrio10, {mqueue_ts(lists:last(MsgsPrio10)), mqueue_prio(lists:last(MsgsPrio10))}),
{MsgsPrio10To5, #{start := StartPos, position := _}} = ?Q:query(Q, #{
position => LastInfPos, limit => 20
}),
?assertEqual(ExpMsgsPrio10 ++ ExpMsgsPrio5, with_empty_extra(MsgsPrio10To5)).
conservation_prop() ->
?FORALL(
@ -413,3 +509,9 @@ drain(Q) ->
{{value, #message{topic = T, payload = P}}, Q1} ->
[{T, P} | drain(Q1)]
end.
mqueue_ts(#message{extra = #{mqueue_insert_ts := Ts}}) -> Ts.
mqueue_prio(#message{extra = #{mqueue_priority := Prio}}) -> Prio.
with_empty_extra(Msgs) ->
[M#message{extra = #{}} || M <- Msgs].

View File

@ -30,7 +30,8 @@ all() ->
{group, routing_schema_v1},
{group, routing_schema_v2},
t_routing_schema_switch_v1,
t_routing_schema_switch_v2
t_routing_schema_switch_v2,
t_routing_schema_consistent_clean_cluster
].
groups() ->
@ -477,6 +478,60 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) ->
ok = emqx_cth_cluster:stop(Nodes)
end.
t_routing_schema_consistent_clean_cluster(Config) ->
WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
% Start first node with routing schema v1
[Node1] = emqx_cth_cluster:start(
[
{routing_schema_consistent1, #{
apps => [mk_genrpc_appspec(), mk_emqx_appspec(1, v1)]
}}
],
#{work_dir => WorkDir}
),
% Start rest of nodes with routing schema v2
NodesRest = emqx_cth_cluster:start(
[
{routing_schema_consistent2, #{
apps => [mk_genrpc_appspec(), mk_emqx_appspec(2, v2)],
base_port => 20000,
join_to => Node1
}},
{routing_schema_consistent3, #{
apps => [mk_genrpc_appspec(), mk_emqx_appspec(3, v2)],
base_port => 20100,
join_to => Node1
}}
],
#{work_dir => WorkDir}
),
Nodes = [Node1 | NodesRest],
try
% Verify that cluser is still on v1
?assertEqual(
[{ok, v1} || _ <- Nodes],
erpc:multicall(Nodes, emqx_router, get_schema_vsn, [])
),
% Wait for all nodes to agree on cluster state
?retry(
500,
10,
?assertEqual(
[{ok, Nodes} || _ <- Nodes],
erpc:multicall(Nodes, emqx, running_nodes, [])
)
),
C1 = start_client(Node1),
C2 = start_client(hd(NodesRest)),
ok = subscribe(C2, <<"t/#">>),
{ok, _} = publish(C1, <<"t/a/b/c">>, <<"yayconsistency">>),
?assertReceive({pub, C2, #{topic := <<"t/a/b/c">>, payload := <<"yayconsistency">>}}),
ok = emqtt:stop(C1),
ok = emqtt:stop(C2)
after
ok = emqx_cth_cluster:stop(Nodes)
end.
t_slow_rlog_routing_consistency(init, Config) ->
[Core1, _Core2, _Replicant] = ?config(cluster, Config),
MnesiaHook = rpc:call(Core1, persistent_term, get, [{mnesia_hook, post_commit}]),

View File

@ -19,6 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -116,6 +117,80 @@ t_session_stats(_) ->
maps:from_list(Stats)
).
t_session_inflight_query(_) ->
EmptyInflight = emqx_inflight:new(500),
Session = session(#{inflight => EmptyInflight}),
EmptyQueryResMeta = {[], #{position => none, start => none}},
?assertEqual(EmptyQueryResMeta, inflight_query(Session, none, 10)),
?assertEqual(EmptyQueryResMeta, inflight_query(Session, none, 10)),
RandPos = erlang:system_time(nanosecond),
?assertEqual({[], #{position => RandPos, start => none}}, inflight_query(Session, RandPos, 10)),
Inflight = lists:foldl(
fun(Seq, Acc) ->
Msg = emqx_message:make(clientid, ?QOS_2, <<"t">>, integer_to_binary(Seq)),
emqx_inflight:insert(Seq, emqx_session_mem:with_ts(Msg), Acc)
end,
EmptyInflight,
lists:seq(1, 114)
),
Session1 = session(#{inflight => Inflight}),
{LastPos, LastStart} = lists:foldl(
fun(PageSeq, {Pos, PrevStart}) ->
Limit = 10,
{Page, #{position := NextPos, start := Start}} = inflight_query(Session1, Pos, Limit),
?assertEqual(10, length(Page)),
ExpFirst = PageSeq * Limit - Limit + 1,
ExpLast = PageSeq * Limit,
FirstMsg = lists:nth(1, Page),
LastMsg = lists:nth(10, Page),
?assertEqual(integer_to_binary(ExpFirst), emqx_message:payload(FirstMsg)),
?assertEqual(integer_to_binary(ExpLast), emqx_message:payload(LastMsg)),
%% start value must not change as Inflight is not modified during traversal
NextStart =
case PageSeq of
1 ->
?assertEqual(inflight_ts(FirstMsg), Start),
Start;
_ ->
?assertEqual(PrevStart, Start),
PrevStart
end,
?assertEqual(inflight_ts(LastMsg), NextPos),
{NextPos, NextStart}
end,
{none, none},
lists:seq(1, 11)
),
{LastPartialPage, #{position := FinalPos} = LastMeta} = inflight_query(
Session1, LastPos, 10
),
LastMsg = lists:nth(4, LastPartialPage),
?assertEqual(4, length(LastPartialPage)),
?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))),
?assertEqual(<<"114">>, emqx_message:payload(LastMsg)),
?assertEqual(#{position => inflight_ts(LastMsg), start => LastStart}, LastMeta),
?assertEqual(
{[], #{start => LastStart, position => FinalPos}},
inflight_query(Session1, FinalPos, 10)
),
{LargePage, LargeMeta} = inflight_query(Session1, none, 1000),
?assertEqual(114, length(LargePage)),
?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))),
?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))),
?assertEqual(#{start => LastStart, position => FinalPos}, LargeMeta),
{FullPage, FullMeta} = inflight_query(Session1, none, 114),
?assertEqual(LargePage, FullPage),
?assertEqual(LargeMeta, FullMeta),
Session2 = session(#{inflight => emqx_inflight:delete(1, Inflight)}),
{PageAfterRemove, #{start := StartAfterRemove}} = inflight_query(Session2, none, 10),
?assertEqual(<<"2">>, emqx_message:payload(hd(PageAfterRemove))),
?assertEqual(StartAfterRemove, inflight_ts(hd(PageAfterRemove))).
%%--------------------------------------------------------------------
%% Test cases for sub/unsub
%%--------------------------------------------------------------------
@ -275,9 +350,10 @@ t_pubrel_error_packetid_not_found(_) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session_mem:pubrel(1, session()).
t_pubcomp(_) ->
Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()),
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
Inflight = emqx_inflight:insert(1, with_ts(wait_comp, Msg), emqx_inflight:new()),
Session = session(#{inflight => Inflight}),
{ok, undefined, [], Session1} = emqx_session_mem:pubcomp(clientinfo(), 1, Session),
{ok, Msg, [], Session1} = emqx_session_mem:pubcomp(clientinfo(), 1, Session),
?assertEqual(0, emqx_session_mem:info(inflight_cnt, Session1)).
t_pubcomp_error_packetid_in_use(_) ->
@ -600,3 +676,8 @@ set_duplicate_pub({Id, Msg}) ->
get_packet_id({Id, _}) ->
Id.
inflight_query(Session, Pos, Limit) ->
emqx_session_mem:info({inflight_msgs, #{position => Pos, limit => Limit}}, Session).
inflight_ts(#message{extra = #{inflight_insert_ts := Ts}}) -> Ts.

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth, [
{description, "EMQX Authentication and authorization"},
{vsn, "0.2.2"},
{vsn, "0.3.0"},
{modules, []},
{registered, [emqx_auth_sup]},
{applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_http, [
{description, "EMQX External HTTP API Authentication and Authorization"},
{vsn, "0.1.4"},
{vsn, "0.2.0"},
{registered, []},
{mod, {emqx_auth_http_app, []}},
{applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge, [
{description, "EMQX bridges"},
{vsn, "0.1.34"},
{vsn, "0.2.0"},
{registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}},
{applications, [

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_cassandra, [
{description, "EMQX Enterprise Cassandra Bridge"},
{vsn, "0.2.0"},
{vsn, "0.3.0"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_clickhouse, [
{description, "EMQX Enterprise ClickHouse Bridge"},
{vsn, "0.3.0"},
{vsn, "0.4.0"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_dynamo, [
{description, "EMQX Enterprise Dynamo Bridge"},
{vsn, "0.1.5"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.2.2"},
{vsn, "0.3.0"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_greptimedb, [
{description, "EMQX GreptimeDB Bridge"},
{vsn, "0.1.8"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -3,7 +3,8 @@
{erl_opts, [debug_info]}.
{deps, [
{hstreamdb_erl,
{git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.5.18+v0.18.1"}}},
{git, "https://github.com/hstreamdb/hstreamdb_erl.git",
{tag, "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}}},
{emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../../apps/emqx_utils"}}
]}.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_hstreamdb, [
{description, "EMQX Enterprise HStreamDB Bridge"},
{vsn, "0.1.4"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_http, [
{description, "EMQX HTTP Bridge and Connector Application"},
{vsn, "0.2.4"},
{vsn, "0.3.0"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource, ehttpc]},
{env, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_iotdb, [
{description, "EMQX Enterprise Apache IoTDB Bridge"},
{vsn, "0.1.7"},
{vsn, "0.2.0"},
{modules, [
emqx_bridge_iotdb,
emqx_bridge_iotdb_connector

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.2.2"},
{vsn, "0.3.0"},
{registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [
kernel,

View File

@ -28,6 +28,17 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2).
bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) ->
%% Ancient v1 config, when `kafka' key was wrapped by `producer'
BridgeV1Conf1 = emqx_utils_maps:unindent(<<"producer">>, BridgeV1Conf0),
BridgeV1Conf =
case maps:take(<<"mqtt">>, BridgeV1Conf1) of
{#{<<"topic">> := Topic}, BridgeV1Conf2} when is_binary(Topic) ->
BridgeV1Conf2#{<<"local_topic">> => Topic};
_ ->
maps:remove(<<"mqtt">>, BridgeV1Conf1)
end,
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName);
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, schema_module(), kafka_producer

View File

@ -6,7 +6,8 @@
-include_lib("eunit/include/eunit.hrl").
-export([atoms/0]).
-export([atoms/0, kafka_producer_old_hocon/1]).
%% ensure atoms exist
atoms() -> [myproducer, my_consumer].

View File

@ -36,6 +36,7 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "toxiproxy.emqx.net"),
@ -79,9 +80,22 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(t_ancient_v1_config_migration_with_local_topic = TestCase, Config) ->
Cluster = setup_cluster_ancient_config(TestCase, Config, #{with_local_topic => true}),
[{cluster, Cluster} | Config];
init_per_testcase(t_ancient_v1_config_migration_without_local_topic = TestCase, Config) ->
Cluster = setup_cluster_ancient_config(TestCase, Config, #{with_local_topic => false}),
[{cluster, Cluster} | Config];
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(TestCase, Config) when
TestCase =:= t_ancient_v1_config_migration_with_local_topic;
TestCase =:= t_ancient_v1_config_migration_without_local_topic
->
Cluster = ?config(cluster, Config),
emqx_cth_cluster:stop(Cluster),
ok;
end_per_testcase(_TestCase, Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
@ -94,6 +108,32 @@ end_per_testcase(_TestCase, Config) ->
%% Helper fns
%%-------------------------------------------------------------------------------------
basic_node_conf(WorkDir) ->
#{
<<"node">> => #{
<<"cookie">> => erlang:get_cookie(),
<<"data_dir">> => unicode:characters_to_binary(WorkDir)
}
}.
setup_cluster_ancient_config(TestCase, Config, #{with_local_topic := WithLocalTopic}) ->
AncientIOList = emqx_bridge_kafka_tests:kafka_producer_old_hocon(WithLocalTopic),
{ok, AncientCfg0} = hocon:binary(AncientIOList),
WorkDir = emqx_cth_suite:work_dir(TestCase, Config),
BasicConf = basic_node_conf(WorkDir),
AncientCfg = emqx_utils_maps:deep_merge(BasicConf, AncientCfg0),
Apps = [
emqx,
emqx_conf,
emqx_connector,
emqx_bridge_kafka,
{emqx_bridge, #{schema_mod => emqx_enterprise_schema, config => AncientCfg}}
],
emqx_cth_cluster:start(
[{kafka_producer_ancient_cfg1, #{apps => Apps}}],
#{work_dir => WorkDir}
).
check_send_message_with_bridge(BridgeName) ->
#{offset := Offset, payload := Payload} = send_message(BridgeName),
%% ######################################
@ -578,3 +618,23 @@ t_create_connector_while_connection_is_down(Config) ->
[]
),
ok.
t_ancient_v1_config_migration_with_local_topic(Config) ->
%% Simply starting this test case successfully is enough, as the core of the test is
%% to be able to successfully start the node with the ancient config.
[Node] = ?config(cluster, Config),
?assertMatch(
[#{type := <<"kafka_producer">>}],
erpc:call(Node, fun emqx_bridge_v2:list/0)
),
ok.
t_ancient_v1_config_migration_without_local_topic(Config) ->
%% Simply starting this test case successfully is enough, as the core of the test is
%% to be able to successfully start the node with the ancient config.
[Node] = ?config(cluster, Config),
?assertMatch(
[#{type := <<"kafka_producer">>}],
erpc:call(Node, fun emqx_bridge_v2:list/0)
),
ok.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_kinesis, [
{description, "EMQX Enterprise Amazon Kinesis Bridge"},
{vsn, "0.1.4"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_mongodb, [
{description, "EMQX Enterprise MongoDB Bridge"},
{vsn, "0.2.4"},
{vsn, "0.3.0"},
{registered, []},
{applications, [
kernel,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"},
{vsn, "0.1.9"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_opents, [
{description, "EMQX Enterprise OpenTSDB Bridge"},
{vsn, "0.1.4"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_oracle, [
{description, "EMQX Enterprise Oracle Database Bridge"},
{vsn, "0.1.5"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"},
{vsn, "0.1.9"},
{vsn, "0.2.1"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rabbitmq, [
{description, "EMQX Enterprise RabbitMQ Bridge"},
{vsn, "0.1.8"},
{vsn, "0.2.0"},
{registered, []},
{mod, {emqx_bridge_rabbitmq_app, []}},
{applications, [

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.1.5"},
{vsn, "0.2.0"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, [

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_sqlserver, [
{description, "EMQX Enterprise SQL Server Bridge"},
{vsn, "0.1.6"},
{vsn, "0.2.0"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource, odbc]},
{env, [

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_tdengine, [
{description, "EMQX Enterprise TDEngine Bridge"},
{vsn, "0.1.7"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_conf, [
{description, "EMQX configuration management"},
{vsn, "0.1.35"},
{vsn, "0.2.0"},
{registered, []},
{mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib]},

View File

@ -79,6 +79,7 @@
-define(DEFAULT_MAX_PORTS, 1024 * 1024).
-define(LOG_THROTTLING_MSGS, [
authentication_failure,
authorization_permission_denied,
cannot_publish_to_topic_due_to_not_authorized,
cannot_publish_to_topic_due_to_quota_exceeded,
@ -1277,6 +1278,15 @@ log_handler_common_confs(Handler, Default) ->
importance => ?IMPORTANCE_MEDIUM
}
)},
{"timestamp_format",
sc(
hoconsc:enum([auto, epoch, rfc3339]),
#{
default => auto,
desc => ?DESC("common_handler_timestamp_format"),
importance => ?IMPORTANCE_MEDIUM
}
)},
{"time_offset",
sc(
string(),

View File

@ -77,7 +77,8 @@ t_log_conf(_Conf) ->
<<"rotation_count">> => 10,
<<"rotation_size">> => <<"50MB">>,
<<"time_offset">> => <<"system">>,
<<"path">> => <<"log/emqx.log">>
<<"path">> => <<"log/emqx.log">>,
<<"timestamp_format">> => <<"auto">>
},
ExpectLog1 = #{
<<"console">> =>
@ -85,7 +86,8 @@ t_log_conf(_Conf) ->
<<"enable">> => true,
<<"formatter">> => <<"text">>,
<<"level">> => <<"debug">>,
<<"time_offset">> => <<"system">>
<<"time_offset">> => <<"system">>,
<<"timestamp_format">> => <<"auto">>
},
<<"file">> =>
#{<<"default">> => FileExpect},

View File

@ -131,8 +131,9 @@ log.file_handlers {
chars_limit => unlimited,
depth => 100,
single_line => true,
template => [time, " [", level, "] ", msg, "\n"],
time_offset => TimeOffset
template => ["[", level, "] ", msg, "\n"],
time_offset => TimeOffset,
timestamp_format => auto
}}
).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "EMQX Data Integration Connectors"},
{vsn, "0.2.0"},
{vsn, "0.3.0"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [

View File

@ -238,7 +238,7 @@ transform_bridge_v1_config_to_action_config(
ActionMap0 = lists:foldl(
fun
({enable, _Spec}, ToTransformSoFar) ->
%% Enable filed is used in both
%% Enable field is used in both
ToTransformSoFar;
({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
ConnectorFieldNameBin = to_bin(ConnectorFieldName),

View File

@ -2,7 +2,7 @@
{application, emqx_dashboard, [
{description, "EMQX Web Dashboard"},
% strict semver, bump manually!
{vsn, "5.0.33"},
{vsn, "5.1.0"},
{modules, []},
{registered, [emqx_dashboard_sup]},
{applications, [

View File

@ -178,36 +178,27 @@ fields(hasnext) ->
>>,
Meta = #{desc => Desc, required => true},
[{hasnext, hoconsc:mk(boolean(), Meta)}];
fields('after') ->
Desc = <<
"The value of \"last\" field returned in the previous response. It can then be used"
" in subsequent requests to get the next chunk of results.<br/>"
"It is used instead of \"page\" parameter to traverse volatile data.<br/>"
"Can be omitted or set to \"none\" to get the first chunk of data.<br/>"
"\last\" = end_of_data\" is returned, if there is no more data.<br/>"
"Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\""
" error response."
>>,
Meta = #{
in => query, desc => Desc, required => false, example => <<"AAYS53qRa0n07AAABFIACg">>
},
[{'after', hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
fields(last) ->
fields(position) ->
Desc = <<
"An opaque token that can then be in subsequent requests to get "
" the next chunk of results: \"?after={last}\"<br/>"
"if there is no more data, \"last\" = end_of_data\" is returned.<br/>"
"Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\""
" error response."
" the next chunk of results: \"?position={prev_response.meta.position}\"<br/>"
"It is used instead of \"page\" parameter to traverse highly volatile data.<br/>"
"Can be omitted or set to \"none\" to get the first chunk of data."
>>,
Meta = #{
desc => Desc, required => true, example => <<"AAYS53qRa0n07AAABFIACg">>
in => query, desc => Desc, required => false, example => <<"none">>
},
[{last, hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
[{position, hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}];
fields(start) ->
Desc = <<"The position of the current first element of the data collection.">>,
Meta = #{
desc => Desc, required => true, example => <<"none">>
},
[{start, hoconsc:mk(hoconsc:union([none, binary()]), Meta)}];
fields(meta) ->
fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext);
fields(continuation_meta) ->
fields(last) ++ fields(count).
fields(start) ++ fields(position).
-spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema().
schema_with_example(Type, Example) ->

View File

@ -2,7 +2,7 @@
{application, emqx_durable_storage, [
{description, "Message persistence and subscription replays for EMQX"},
% strict semver, bump manually!
{vsn, "0.1.12"},
{vsn, "0.2.0"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]},

View File

@ -1,6 +1,6 @@
{application, emqx_enterprise, [
{description, "EMQX Enterprise Edition"},
{vsn, "0.1.7"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -78,7 +78,8 @@ t_audit_log_conf(_Config) ->
<<"rotation_count">> => 10,
<<"rotation_size">> => <<"50MB">>,
<<"time_offset">> => <<"system">>,
<<"path">> => <<"log/emqx.log">>
<<"path">> => <<"log/emqx.log">>,
<<"timestamp_format">> => <<"auto">>
},
ExpectLog1 = #{
<<"console">> =>
@ -86,7 +87,8 @@ t_audit_log_conf(_Config) ->
<<"enable">> => false,
<<"formatter">> => <<"text">>,
<<"level">> => <<"warning">>,
<<"time_offset">> => <<"system">>
<<"time_offset">> => <<"system">>,
<<"timestamp_format">> => <<"auto">>
},
<<"file">> =>
#{<<"default">> => FileExpect},
@ -99,7 +101,8 @@ t_audit_log_conf(_Config) ->
<<"max_filter_size">> => 5000,
<<"rotation_count">> => 10,
<<"rotation_size">> => <<"50MB">>,
<<"time_offset">> => <<"system">>
<<"time_offset">> => <<"system">>,
<<"timestamp_format">> => <<"auto">>
}
},
%% The default value of throttling.msgs can be frequently updated,

View File

@ -234,7 +234,7 @@ parse_data(
<<Year:?BYTE, Month:?BYTE, Day:?BYTE, Hour:?BYTE, Minute:?BYTE, Second:?BYTE, Total:?BYTE,
Rest/binary>>
) ->
%% XXX: need check ACK filed?
%% XXX: need check ACK field?
#{
<<"Time">> => #{
<<"Year">> => Year,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_mqttsn, [
{description, "MQTT-SN Gateway"},
{vsn, "0.1.8"},
{vsn, "0.2.0"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang; -*-
{deps, [
{jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.7.12"}}},
{jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.8.0"}}},
{emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../emqx_utils"}},
{emqx_gateway, {path, "../../apps/emqx_gateway"}}

View File

@ -80,24 +80,29 @@ update_setting(Setting) when is_map(Setting) ->
check(_ConnInfo, AckProps) ->
case emqx_license_checker:limits() of
{ok, #{max_connections := ?ERR_EXPIRED}} ->
?SLOG(error, #{msg => "connection_rejected_due_to_license_expired"}),
?SLOG(error, #{msg => "connection_rejected_due_to_license_expired"}, #{tag => "LICENSE"}),
{stop, {error, ?RC_QUOTA_EXCEEDED}};
{ok, #{max_connections := MaxClients}} ->
case check_max_clients_exceeded(MaxClients) of
true ->
?SLOG_THROTTLE(
error,
#{msg => connection_rejected_due_to_license_limit_reached}
#{msg => connection_rejected_due_to_license_limit_reached},
#{tag => "LICENSE"}
),
{stop, {error, ?RC_QUOTA_EXCEEDED}};
false ->
{ok, AckProps}
end;
{error, Reason} ->
?SLOG(error, #{
msg => "connection_rejected_due_to_license_not_loaded",
reason => Reason
}),
?SLOG(
error,
#{
msg => "connection_rejected_due_to_license_not_loaded",
reason => Reason
},
#{tag => "LICENSE"}
),
{stop, {error, ?RC_QUOTA_EXCEEDED}}
end.

View File

@ -172,11 +172,15 @@ refresh(State) ->
State.
log_new_license(Old, New) ->
?SLOG(info, #{
msg => "new_license_loaded",
old_license => emqx_license_parser:summary(Old),
new_license => emqx_license_parser:summary(New)
}).
?SLOG(
info,
#{
msg => "new_license_loaded",
old_license => emqx_license_parser:summary(Old),
new_license => emqx_license_parser:summary(New)
},
#{tag => "LICENSE"}
).
ensure_check_license_timer(#{check_license_interval := CheckInterval} = State) ->
ok = cancel_timer(State, check_timer),

View File

@ -129,13 +129,17 @@ error_msg(Code, Msg) ->
'/license'(post, #{body := #{<<"key">> := Key}}) ->
case emqx_license:update_key(Key) of
{error, Error} ->
?SLOG(error, #{
msg => "bad_license_key",
reason => Error
}),
?SLOG(
error,
#{
msg => "bad_license_key",
reason => Error
},
#{tag => "LICENSE"}
),
{400, error_msg(?BAD_REQUEST, <<"Bad license key">>)};
{ok, _} ->
?SLOG(info, #{msg => "updated_license_key"}),
?SLOG(info, #{msg => "updated_license_key"}, #{tag => "LICENSE"}),
License = maps:from_list(emqx_license_checker:dump()),
{200, License}
end;
@ -147,13 +151,17 @@ error_msg(Code, Msg) ->
'/license/setting'(put, #{body := Setting}) ->
case emqx_license:update_setting(Setting) of
{error, Error} ->
?SLOG(error, #{
msg => "bad_license_setting",
reason => Error
}),
?SLOG(
error,
#{
msg => "bad_license_setting",
reason => Error
},
#{tag => "LICENSE"}
),
{400, error_msg(?BAD_REQUEST, <<"Bad license setting">>)};
{ok, _} ->
?SLOG(info, #{msg => "updated_license_setting"}),
?SLOG(info, #{msg => "updated_license_setting"}, #{tag => "LICENSE"}),
'/license/setting'(get, undefined)
end.

View File

@ -3,7 +3,7 @@
{id, "emqx_machine"},
{description, "The EMQX Machine"},
% strict semver, bump manually!
{vsn, "0.2.19"},
{vsn, "0.3.0"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, emqx_ctl]},

View File

@ -27,6 +27,7 @@
]).
-export([open_ports_check/0]).
-export([mria_lb_custom_info/0, mria_lb_custom_info_check/1]).
-ifdef(TEST).
-export([create_plan/0]).
@ -51,6 +52,13 @@ start() ->
configure_shard_transports(),
set_mnesia_extra_diagnostic_checks(),
emqx_otel_app:configure_otel_deps(),
%% Register mria callbacks that help to check compatibility of the
%% replicant with the core node. Currently they rely on the exact
%% match of the version of EMQX OTP application:
_ = application:load(mria),
_ = application:load(emqx),
mria_config:register_callback(lb_custom_info, fun ?MODULE:mria_lb_custom_info/0),
mria_config:register_callback(lb_custom_info_check, fun ?MODULE:mria_lb_custom_info_check/1),
ekka:start(),
ok.
@ -227,3 +235,21 @@ resolve_dist_address_type() ->
_ ->
inet
end.
%% Note: this function is stored in the Mria's application environment
mria_lb_custom_info() ->
get_emqx_vsn().
%% Note: this function is stored in the Mria's application environment
mria_lb_custom_info_check(undefined) ->
false;
mria_lb_custom_info_check(OtherVsn) ->
get_emqx_vsn() =:= OtherVsn.
get_emqx_vsn() ->
case application:get_key(emqx, vsn) of
{ok, Vsn} ->
Vsn;
undefined ->
undefined
end.

View File

@ -15,6 +15,3 @@
%%--------------------------------------------------------------------
-define(DEFAULT_ROW_LIMIT, 100).
-define(URL_PARAM_INTEGER, url_param_integer).
-define(URL_PARAM_BINARY, url_param_binary).

View File

@ -2,7 +2,7 @@
{application, emqx_management, [
{description, "EMQX Management API and CLI"},
% strict semver, bump manually!
{vsn, "5.1.0"},
{vsn, "5.2.0"},
{modules, []},
{registered, [emqx_management_sup]},
{applications, [

View File

@ -359,6 +359,10 @@ kickout_client(ClientId) ->
case lookup_client({clientid, ClientId}, undefined) of
[] ->
{error, not_found};
[{ClientId, _}] ->
%% Offline durable session (client ID is a plain binary
%% without channel pid):
emqx_persistent_session_ds:kick_offline_session(ClientId);
_ ->
Results = [kickout_client(Node, ClientId) || Node <- emqx:running_nodes()],
check_results(Results)
@ -372,6 +376,7 @@ kickout_clients(ClientIds) when is_list(ClientIds) ->
emqx_management_proto_v5:kickout_clients(Node, ClientIds)
end,
Results = lists:map(F, emqx:running_nodes()),
lists:foreach(fun emqx_persistent_session_ds:kick_offline_session/1, ClientIds),
case lists:filter(fun(Res) -> Res =/= ok end, Results) of
[] ->
ok;
@ -509,7 +514,7 @@ do_call_client(ClientId, Req) ->
Pid = lists:last(Pids),
case emqx_cm:get_chan_info(ClientId, Pid) of
#{conninfo := #{conn_mod := ConnMod}} ->
erlang:apply(ConnMod, call, [Pid, Req]);
call_conn(ConnMod, Pid, Req);
undefined ->
{error, not_found}
end
@ -698,3 +703,13 @@ check_results(Results) ->
default_row_limit() ->
?DEFAULT_ROW_LIMIT.
call_conn(ConnMod, Pid, Req) ->
try
erlang:apply(ConnMod, call, [Pid, Req])
catch
exit:R when R =:= shutdown; R =:= normal ->
{error, shutdown};
exit:{R, _} when R =:= shutdown; R =:= noproc ->
{error, shutdown}
end.

View File

@ -39,7 +39,6 @@
-export([
parse_pager_params/1,
parse_cont_pager_params/2,
encode_cont_pager_params/2,
parse_qstring/2,
init_query_result/0,
init_query_state/5,
@ -138,32 +137,18 @@ page(Params) ->
limit(Params) when is_map(Params) ->
maps:get(<<"limit">>, Params, emqx_mgmt:default_row_limit()).
continuation(Params, Encoding) ->
position(Params, Decoder) ->
try
decode_continuation(maps:get(<<"after">>, Params, none), Encoding)
decode_position(maps:get(<<"position">>, Params, none), Decoder)
catch
_:_ ->
error
end.
decode_continuation(none, _Encoding) ->
decode_position(none, _Decoder) ->
none;
decode_continuation(end_of_data, _Encoding) ->
%% Clients should not send "after=end_of_data" back to the server
error;
decode_continuation(Cont, ?URL_PARAM_INTEGER) ->
binary_to_integer(Cont);
decode_continuation(Cont, ?URL_PARAM_BINARY) ->
emqx_utils:hexstr_to_bin(Cont).
encode_continuation(none, _Encoding) ->
none;
encode_continuation(end_of_data, _Encoding) ->
end_of_data;
encode_continuation(Cont, ?URL_PARAM_INTEGER) ->
integer_to_binary(Cont);
encode_continuation(Cont, ?URL_PARAM_BINARY) ->
emqx_utils:bin_to_hexstr(Cont, lower).
decode_position(Pos, Decoder) ->
Decoder(Pos).
%%--------------------------------------------------------------------
%% Node Query
@ -670,25 +655,18 @@ parse_pager_params(Params) ->
false
end.
-spec parse_cont_pager_params(map(), ?URL_PARAM_INTEGER | ?URL_PARAM_BINARY) ->
#{limit := pos_integer(), continuation := none | end_of_table | binary()} | false.
parse_cont_pager_params(Params, Encoding) ->
Cont = continuation(Params, Encoding),
-spec parse_cont_pager_params(map(), fun((binary()) -> term())) ->
#{limit := pos_integer(), position := none | term()} | false.
parse_cont_pager_params(Params, PositionDecoder) ->
Pos = position(Params, PositionDecoder),
Limit = b2i(limit(Params)),
case Limit > 0 andalso Cont =/= error of
case Limit > 0 andalso Pos =/= error of
true ->
#{continuation => Cont, limit => Limit};
#{position => Pos, limit => Limit};
false ->
false
end.
-spec encode_cont_pager_params(map(), ?URL_PARAM_INTEGER | ?URL_PARAM_BINARY) -> map().
encode_cont_pager_params(#{continuation := Cont} = Meta, ContEncoding) ->
Meta1 = maps:remove(continuation, Meta),
Meta1#{last => encode_continuation(Cont, ContEncoding)};
encode_cont_pager_params(Meta, _ContEncoding) ->
Meta.
%%--------------------------------------------------------------------
%% Types
%%--------------------------------------------------------------------

View File

@ -90,6 +90,11 @@
message => <<"Client ID not found">>
}).
-define(CLIENT_SHUTDOWN, #{
code => 'CLIENT_SHUTDOWN',
message => <<"Client connection has been shutdown">>
}).
namespace() -> undefined.
api_spec() ->
@ -413,11 +418,11 @@ schema("/clients/:clientid/keepalive") ->
}
};
schema("/clients/:clientid/mqueue_messages") ->
ContExample = <<"AAYS53qRa0n07AAABFIACg">>,
ContExample = <<"1710785444656449826_10">>,
RespSchema = ?R_REF(mqueue_messages),
client_msgs_schema(mqueue_msgs, ?DESC(get_client_mqueue_msgs), ContExample, RespSchema);
schema("/clients/:clientid/inflight_messages") ->
ContExample = <<"10">>,
ContExample = <<"1710785444656449826">>,
RespSchema = ?R_REF(inflight_messages),
client_msgs_schema(inflight_msgs, ?DESC(get_client_inflight_msgs), ContExample, RespSchema);
schema("/sessions_count") ->
@ -716,7 +721,7 @@ fields(unsubscribe) ->
];
fields(mqueue_messages) ->
[
{data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(mqueue_msgs_list)})},
{data, hoconsc:mk(hoconsc:array(?REF(mqueue_message)), #{desc => ?DESC(mqueue_msgs_list)})},
{meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, continuation_meta), #{})}
];
fields(inflight_messages) ->
@ -732,8 +737,18 @@ fields(message) ->
{publish_at, hoconsc:mk(integer(), #{desc => ?DESC(msg_publish_at)})},
{from_clientid, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_clientid)})},
{from_username, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_username)})},
{payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})}
{payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})},
{inserted_at, hoconsc:mk(binary(), #{desc => ?DESC(msg_inserted_at)})}
];
fields(mqueue_message) ->
fields(message) ++
[
{mqueue_priority,
hoconsc:mk(
hoconsc:union([integer(), infinity]),
#{desc => ?DESC(msg_mqueue_priority)}
)}
];
fields(requested_client_fields) ->
%% NOTE: some Client fields actually returned in response are missing in schema:
%% enable_authn, is_persistent, listener, peerport
@ -980,7 +995,7 @@ client_msgs_schema(OpId, Desc, ContExample, RespSchema) ->
responses => #{
200 =>
emqx_dashboard_swagger:schema_with_example(RespSchema, #{
<<"data">> => [message_example()],
<<"data">> => [message_example(OpId)],
<<"meta">> => #{
<<"count">> => 100,
<<"last">> => ContExample
@ -991,7 +1006,10 @@ client_msgs_schema(OpId, Desc, ContExample, RespSchema) ->
['INVALID_PARAMETER'], <<"Invalid parameters">>
),
404 => emqx_dashboard_swagger:error_codes(
['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
['CLIENTID_NOT_FOUND', 'CLIENT_SHUTDOWN'], <<"Client ID not found">>
),
?NOT_IMPLEMENTED => emqx_dashboard_swagger:error_codes(
['NOT_IMPLEMENTED'], <<"API not implemented">>
)
}
}
@ -1023,7 +1041,7 @@ client_msgs_params() ->
>>,
validator => fun max_bytes_validator/1
})},
hoconsc:ref(emqx_dashboard_swagger, 'after'),
hoconsc:ref(emqx_dashboard_swagger, position),
hoconsc:ref(emqx_dashboard_swagger, limit)
].
@ -1260,22 +1278,53 @@ is_live_session(ClientId) ->
[] =/= emqx_cm_registry:lookup_channels(ClientId).
list_client_msgs(MsgType, ClientID, QString) ->
case emqx_mgmt_api:parse_cont_pager_params(QString, cont_encoding(MsgType)) of
case emqx_mgmt_api:parse_cont_pager_params(QString, pos_decoder(MsgType)) of
false ->
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"after_limit_invalid">>}};
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"position_limit_invalid">>}};
PagerParams = #{} ->
case emqx_mgmt:list_client_msgs(MsgType, ClientID, PagerParams) of
{error, not_found} ->
{404, ?CLIENTID_NOT_FOUND};
{error, shutdown} ->
{404, ?CLIENT_SHUTDOWN};
{error, not_implemented} ->
{?NOT_IMPLEMENTED, #{
code => 'NOT_IMPLEMENTED',
message => <<"API not implemented for persistent sessions">>
}};
{Msgs, Meta = #{}} when is_list(Msgs) ->
format_msgs_resp(MsgType, Msgs, Meta, QString)
end
end.
%% integer packet id
cont_encoding(inflight_msgs) -> ?URL_PARAM_INTEGER;
%% binary message id
cont_encoding(mqueue_msgs) -> ?URL_PARAM_BINARY.
pos_decoder(mqueue_msgs) -> fun decode_mqueue_pos/1;
pos_decoder(inflight_msgs) -> fun decode_msg_pos/1.
encode_msgs_meta(_MsgType, #{start := StartPos, position := Pos}) ->
#{start => encode_pos(StartPos), position => encode_pos(Pos)}.
encode_pos(none) ->
none;
encode_pos({MsgPos, PrioPos}) ->
MsgPosBin = integer_to_binary(MsgPos),
PrioPosBin =
case PrioPos of
infinity -> <<"infinity">>;
_ -> integer_to_binary(PrioPos)
end,
<<MsgPosBin/binary, "_", PrioPosBin/binary>>;
encode_pos(Pos) when is_integer(Pos) ->
integer_to_binary(Pos).
-spec decode_mqueue_pos(binary()) -> {integer(), infinity | integer()}.
decode_mqueue_pos(Pos) ->
[MsgPos, PrioPos] = binary:split(Pos, <<"_">>),
{decode_msg_pos(MsgPos), decode_priority_pos(PrioPos)}.
decode_msg_pos(Pos) -> binary_to_integer(Pos).
decode_priority_pos(<<"infinity">>) -> infinity;
decode_priority_pos(Pos) -> binary_to_integer(Pos).
max_bytes_validator(MaxBytes) when is_integer(MaxBytes), MaxBytes > 0 ->
ok;
@ -1482,8 +1531,8 @@ format_msgs_resp(MsgType, Msgs, Meta, QString) ->
<<"payload">> := PayloadFmt,
<<"max_payload_bytes">> := MaxBytes
} = QString,
Meta1 = emqx_mgmt_api:encode_cont_pager_params(Meta, cont_encoding(MsgType)),
Resp = #{meta => Meta1, data => format_msgs(Msgs, PayloadFmt, MaxBytes)},
Meta1 = encode_msgs_meta(MsgType, Meta),
Resp = #{meta => Meta1, data => format_msgs(MsgType, Msgs, PayloadFmt, MaxBytes)},
%% Make sure minirest won't set another content-type for self-encoded JSON response body
Headers = #{<<"content-type">> => <<"application/json">>},
case emqx_utils_json:safe_encode(Resp) of
@ -1499,13 +1548,13 @@ format_msgs_resp(MsgType, Msgs, Meta, QString) ->
?INTERNAL_ERROR(Error)
end.
format_msgs([FirstMsg | Msgs], PayloadFmt, MaxBytes) ->
format_msgs(MsgType, [FirstMsg | Msgs], PayloadFmt, MaxBytes) ->
%% Always include at least one message payload, even if it exceeds the limit
{FirstMsg1, PayloadSize0} = format_msg(FirstMsg, PayloadFmt),
{FirstMsg1, PayloadSize0} = format_msg(MsgType, FirstMsg, PayloadFmt),
{Msgs1, _} =
catch lists:foldl(
fun(Msg, {MsgsAcc, SizeAcc} = Acc) ->
{Msg1, PayloadSize} = format_msg(Msg, PayloadFmt),
{Msg1, PayloadSize} = format_msg(MsgType, Msg, PayloadFmt),
case SizeAcc + PayloadSize of
SizeAcc1 when SizeAcc1 =< MaxBytes ->
{[Msg1 | MsgsAcc], SizeAcc1};
@ -1517,10 +1566,11 @@ format_msgs([FirstMsg | Msgs], PayloadFmt, MaxBytes) ->
Msgs
),
lists:reverse(Msgs1);
format_msgs([], _PayloadFmt, _MaxBytes) ->
format_msgs(_MsgType, [], _PayloadFmt, _MaxBytes) ->
[].
format_msg(
MsgType,
#message{
id = ID,
qos = Qos,
@ -1529,10 +1579,10 @@ format_msg(
timestamp = Timestamp,
headers = Headers,
payload = Payload
},
} = Msg,
PayloadFmt
) ->
Msg = #{
MsgMap = #{
msgid => emqx_guid:to_hexstr(ID),
qos => Qos,
topic => Topic,
@ -1540,15 +1590,23 @@ format_msg(
from_clientid => emqx_utils_conv:bin(From),
from_username => maps:get(username, Headers, <<>>)
},
format_payload(PayloadFmt, Msg, Payload).
MsgMap1 = format_by_msg_type(MsgType, Msg, MsgMap),
format_payload(PayloadFmt, MsgMap1, Payload).
format_payload(none, Msg, _Payload) ->
{Msg, 0};
format_payload(base64, Msg, Payload) ->
format_by_msg_type(mqueue_msgs, Msg, MsgMap) ->
#message{extra = #{mqueue_priority := Prio, mqueue_insert_ts := Ts}} = Msg,
MsgMap#{mqueue_priority => Prio, inserted_at => integer_to_binary(Ts)};
format_by_msg_type(inflight_msgs, Msg, MsgMap) ->
#message{extra = #{inflight_insert_ts := Ts}} = Msg,
MsgMap#{inserted_at => integer_to_binary(Ts)}.
format_payload(none, MsgMap, _Payload) ->
{MsgMap, 0};
format_payload(base64, MsgMap, Payload) ->
Payload1 = base64:encode(Payload),
{Msg#{payload => Payload1}, erlang:byte_size(Payload1)};
format_payload(plain, Msg, Payload) ->
{Msg#{payload => Payload}, erlang:iolist_size(Payload)}.
{MsgMap#{payload => Payload1}, erlang:byte_size(Payload1)};
format_payload(plain, MsgMap, Payload) ->
{MsgMap#{payload => Payload}, erlang:iolist_size(Payload)}.
%% format func helpers
take_maps_from_inner(_Key, Value, Current) when is_map(Value) ->
@ -1652,6 +1710,11 @@ client_example() ->
<<"durable">> => false
}.
message_example(inflight_msgs) ->
message_example();
message_example(mqueue_msgs) ->
(message_example())#{<<"mqueue_priority">> => 0}.
message_example() ->
#{
<<"msgid">> => <<"000611F460D57FA9F44500000D360002">>,

View File

@ -96,6 +96,11 @@ fields(topic) ->
hoconsc:mk(binary(), #{
desc => <<"Node">>,
required => true
})},
{session,
hoconsc:mk(binary(), #{
desc => <<"Session ID">>,
required => false
})}
].
@ -113,8 +118,8 @@ do_list(Params) ->
try
Pager = parse_pager_params(Params),
{_, Query} = emqx_mgmt_api:parse_qstring(Params, ?TOPICS_QUERY_SCHEMA),
QState = Pager#{continuation => undefined},
QResult = eval_topic_query(qs2ms(Query), QState),
Stream = mk_topic_stream(qs2ms(Query)),
QResult = eval_topic_query(Stream, Pager, emqx_mgmt_api:init_query_result()),
{200, format_list_response(Pager, Query, QResult)}
catch
throw:{error, page_limit_invalid} ->
@ -160,31 +165,48 @@ gen_match_spec({topic, '=:=', QTopic}, {_MTopic, MNode}) when is_atom(MNode) ->
gen_match_spec({node, '=:=', QNode}, {MTopic, _MDest}) ->
{MTopic, QNode}.
eval_topic_query(MS, QState) ->
finalize_query(eval_topic_query(MS, QState, emqx_mgmt_api:init_query_result())).
mk_topic_stream(Spec = {MTopic, _MDest = '_'}) ->
emqx_utils_stream:chain(emqx_router:stream(Spec), mk_persistent_topic_stream(MTopic));
mk_topic_stream(Spec) ->
%% NOTE: Assuming that no persistent topic ever matches a query with `node` filter.
emqx_router:stream(Spec).
eval_topic_query(MS, QState, QResult) ->
case eval_topic_query_page(MS, QState) of
{Rows, '$end_of_table'} ->
{_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
NQResult#{complete => true};
{Rows, NCont} ->
case emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult) of
{more, NQResult} ->
eval_topic_query(MS, QState#{continuation := NCont}, NQResult);
{enough, NQResult} ->
NQResult#{complete => false}
end;
'$end_of_table' ->
QResult#{complete => true}
mk_persistent_topic_stream(Spec) ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
emqx_persistent_session_ds_router:stream(Spec);
false ->
emqx_utils_stream:empty()
end.
eval_topic_query_page(MS, #{limit := Limit, continuation := Cont}) ->
emqx_router:select(MS, Limit, Cont).
eval_count() ->
emqx_router:stats(n_routes) + eval_persistent_count().
finalize_query(QResult = #{overflow := Overflow, complete := Complete}) ->
eval_persistent_count() ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
emqx_persistent_session_ds_router:stats(n_routes);
false ->
0
end.
eval_topic_query(Stream, QState = #{limit := Limit}, QResult) ->
case emqx_utils_stream:consume(Limit, Stream) of
{Rows, NStream} ->
case emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult) of
{more, NQResult} ->
eval_topic_query(NStream, QState, NQResult);
{enough, NQResult} ->
finalize_query(false, NQResult)
end;
Rows when is_list(Rows) ->
{_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
finalize_query(true, NQResult)
end.
finalize_query(Complete, QResult = #{overflow := Overflow}) ->
HasNext = Overflow orelse not Complete,
QResult#{hasnext => HasNext}.
QResult#{complete => Complete, hasnext => HasNext}.
format_list_response(Meta, Query, QResult = #{rows := RowsAcc}) ->
#{
@ -198,14 +220,16 @@ format_list_response(Meta, Query, QResult = #{rows := RowsAcc}) ->
format_response_meta(Meta, _Query, #{hasnext := HasNext, complete := true, cursor := Cursor}) ->
Meta#{hasnext => HasNext, count => Cursor};
format_response_meta(Meta, _Query = {[], []}, #{hasnext := HasNext}) ->
Meta#{hasnext => HasNext, count => emqx_router:stats(n_routes)};
Meta#{hasnext => HasNext, count => eval_count()};
format_response_meta(Meta, _Query, #{hasnext := HasNext}) ->
Meta#{hasnext => HasNext}.
format(#route{topic = Topic, dest = {Group, Node}}) ->
#{topic => ?SHARE(Group, Topic), node => Node};
format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->
#{topic => Topic, node => Node}.
#{topic => Topic, node => Node};
format(#route{topic = Topic, dest = SessionId}) when is_binary(SessionId) ->
#{topic => Topic, session => SessionId}.
topic_param(In) ->
{

View File

@ -56,12 +56,10 @@ client_msgs_testcases() ->
].
init_per_suite(Config) ->
ok = snabbkaffe:start_trace(),
emqx_mgmt_api_test_util:init_suite(),
Config.
end_per_suite(_) ->
ok = snabbkaffe:stop(),
emqx_mgmt_api_test_util:end_suite().
init_per_group(persistent_sessions, Config) ->
@ -95,10 +93,15 @@ end_per_group(persistent_sessions, Config) ->
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(_TC, Config) ->
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(TC, _Config) when
TC =:= t_inflight_messages;
TC =:= t_mqueue_messages
->
ok = snabbkaffe:stop(),
ClientId = atom_to_binary(TC),
lists:foreach(fun(P) -> exit(P, kill) end, emqx_cm:lookup_channels(local, ClientId)),
ok = emqx_common_test_helpers:wait_for(
@ -108,7 +111,7 @@ end_per_testcase(TC, _Config) when
5000
);
end_per_testcase(_TC, _Config) ->
ok.
ok = snabbkaffe:stop().
t_clients(_) ->
process_flag(trap_exit, true),
@ -313,8 +316,7 @@ t_persistent_sessions2(Config) ->
%% 2) Client connects to the same node and takes over, listed only once.
C2 = connect_client(#{port => Port1, clientid => ClientId}),
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
ok = emqtt:stop(C2),
ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}),
?retry(
100,
20,
@ -322,9 +324,7 @@ t_persistent_sessions2(Config) ->
{ok, {{_, 200, _}, _, #{<<"data">> := []}}},
list_request(APIPort)
)
),
ok
)
end,
[]
),
@ -360,10 +360,7 @@ t_persistent_sessions3(Config) ->
list_request(APIPort, "node=" ++ atom_to_list(N1))
)
),
ok = emqtt:stop(C2),
ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
ok
ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0})
end,
[]
),
@ -403,10 +400,7 @@ t_persistent_sessions4(Config) ->
list_request(APIPort, "node=" ++ atom_to_list(N1))
)
),
ok = emqtt:stop(C2),
ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
ok
ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0})
end,
[]
),
@ -1076,18 +1070,19 @@ t_mqueue_messages(Config) ->
Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config)),
IsMqueue = true,
test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config), IsMqueue),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(
get, Path, "limit=10&after=not-base64%23%21", AuthHeader
get, Path, "limit=10&position=not-valid", AuthHeader
)
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(
get, Path, "limit=-5&after=not-base64%23%21", AuthHeader
get, Path, "limit=-5&position=not-valid", AuthHeader
)
).
@ -1099,18 +1094,21 @@ t_inflight_messages(Config) ->
Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]),
InflightLimit = emqx:get_config([mqtt, max_inflight]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
test_messages(Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config)),
IsMqueue = false,
test_messages(
Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config), IsMqueue
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(
get, Path, "limit=10&after=not-int", AuthHeader
get, Path, "limit=10&position=not-int", AuthHeader
)
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(
get, Path, "limit=-5&after=invalid-int", AuthHeader
get, Path, "limit=-5&position=invalid-int", AuthHeader
)
),
emqtt:stop(Client).
@ -1148,19 +1146,16 @@ publish_msgs(Topic, Count) ->
lists:seq(1, Count)
).
test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) ->
Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
{ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
#{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
#{<<"start">> := StartPos, <<"position">> := Pos} = Meta,
?assertMatch(
#{
<<"last">> := <<"end_of_data">>,
<<"count">> := Count
},
Meta
),
?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)),
?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)),
?assertEqual(length(Msgs), Count),
lists:foreach(
fun({Seq, #{<<"payload">> := P} = M}) ->
?assertEqual(Seq, binary_to_integer(decode_payload(P, PayloadEncoding))),
@ -1171,10 +1166,12 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
<<"qos">> := _,
<<"publish_at">> := _,
<<"from_clientid">> := _,
<<"from_username">> := _
<<"from_username">> := _,
<<"inserted_at">> := _
},
M
)
),
IsMqueue andalso ?assertMatch(#{<<"mqueue_priority">> := _}, M)
end,
lists:zip(lists:seq(1, Count), Msgs)
),
@ -1189,62 +1186,69 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
get, Path, QsPayloadLimit, AuthHeader
),
#{<<"meta">> := _, <<"data">> := FirstMsgOnly} = emqx_utils_json:decode(LimitedMsgsResp),
ct:pal("~p", [FirstMsgOnly]),
?assertEqual(1, length(FirstMsgOnly)),
?assertEqual(
<<"1">>, decode_payload(maps:get(<<"payload">>, hd(FirstMsgOnly)), PayloadEncoding)
),
Limit = 19,
LastCont = lists:foldl(
fun(PageSeq, Cont) ->
Qs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, Cont, Limit]),
{ok, MsgsRespP} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader),
LastPos = lists:foldl(
fun(PageSeq, ThisPos) ->
Qs = io_lib:format("payload=~s&position=~s&limit=~p", [PayloadEncoding, ThisPos, Limit]),
{ok, MsgsRespPage} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader),
#{
<<"meta">> := #{<<"last">> := NextCont} = MetaP,
<<"data">> := MsgsP
} = emqx_utils_json:decode(MsgsRespP),
?assertMatch(#{<<"count">> := Count}, MetaP),
?assertNotEqual(<<"end_of_data">>, NextCont),
?assertEqual(length(MsgsP), Limit),
<<"meta">> := #{<<"position">> := NextPos, <<"start">> := ThisStart},
<<"data">> := MsgsPage
} = emqx_utils_json:decode(MsgsRespPage),
?assertEqual(NextPos, msg_pos(lists:last(MsgsPage), IsMqueue)),
%% Start position is the same in every response and points to the first msg
?assertEqual(StartPos, ThisStart),
?assertEqual(length(MsgsPage), Limit),
ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
ExpLastPayload = integer_to_binary(PageSeq * Limit),
?assertEqual(
ExpFirstPayload, decode_payload(maps:get(<<"payload">>, hd(MsgsP)), PayloadEncoding)
ExpFirstPayload,
decode_payload(maps:get(<<"payload">>, hd(MsgsPage)), PayloadEncoding)
),
?assertEqual(
ExpLastPayload,
decode_payload(maps:get(<<"payload">>, lists:last(MsgsP)), PayloadEncoding)
decode_payload(maps:get(<<"payload">>, lists:last(MsgsPage)), PayloadEncoding)
),
NextCont
NextPos
end,
none,
lists:seq(1, Count div 19)
),
LastPartialPage = Count div 19 + 1,
LastQs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, LastCont, Limit]),
LastQs = io_lib:format("payload=~s&position=~s&limit=~p", [PayloadEncoding, LastPos, Limit]),
{ok, MsgsRespLastP} = emqx_mgmt_api_test_util:request_api(get, Path, LastQs, AuthHeader),
#{<<"meta">> := #{<<"last">> := EmptyCont} = MetaLastP, <<"data">> := MsgsLastP} = emqx_utils_json:decode(
#{<<"meta">> := #{<<"position">> := LastPartialPos}, <<"data">> := MsgsLastPage} = emqx_utils_json:decode(
MsgsRespLastP
),
?assertEqual(<<"end_of_data">>, EmptyCont),
?assertMatch(#{<<"count">> := Count}, MetaLastP),
%% The same as the position of all messages returned in one request
?assertEqual(Pos, LastPartialPos),
?assertEqual(
integer_to_binary(LastPartialPage * Limit - Limit + 1),
decode_payload(maps:get(<<"payload">>, hd(MsgsLastP)), PayloadEncoding)
decode_payload(maps:get(<<"payload">>, hd(MsgsLastPage)), PayloadEncoding)
),
?assertEqual(
integer_to_binary(Count),
decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastP)), PayloadEncoding)
decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastPage)), PayloadEncoding)
),
ExceedQs = io_lib:format("payload=~s&after=~s&limit=~p", [
PayloadEncoding, EmptyCont, Limit
ExceedQs = io_lib:format("payload=~s&position=~s&limit=~p", [
PayloadEncoding, LastPartialPos, Limit
]),
{ok, MsgsEmptyResp} = emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader)
#{
<<"data">> := [],
<<"meta">> := #{<<"position">> := LastPartialPos, <<"start">> := StartPos}
},
emqx_utils_json:decode(MsgsEmptyResp)
),
%% Invalid common page params
@ -1275,6 +1279,11 @@ test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) ->
emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=0MB", AuthHeader)
).
msg_pos(#{<<"inserted_at">> := TsBin, <<"mqueue_priority">> := Prio} = _Msg, true = _IsMqueue) ->
<<TsBin/binary, "_", (emqx_utils_conv:bin(Prio))/binary>>;
msg_pos(#{<<"inserted_at">> := TsBin} = _Msg, _IsMqueue) ->
TsBin.
decode_payload(Payload, base64) ->
base64:decode(Payload);
decode_payload(Payload, _) ->

View File

@ -27,7 +27,7 @@ all() ->
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
emqx,
{emqx, "session_persistence.enable = true"},
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
@ -204,13 +204,90 @@ t_shared_topics_invalid(_Config) ->
emqx_utils_json:decode(Body, [return_maps])
).
t_persistent_topics(_Config) ->
PersistentOpts = #{
proto_ver => v5,
properties => #{'Session-Expiry-Interval' => 300}
},
Client1 = client(t_persistent_topics_m1),
Client2 = client(t_persistent_topics_m2),
SessionId1 = <<"t_persistent_topics_p1">>,
SessionId2 = <<"t_persistent_topics_p2">>,
ClientPersistent1 = client(SessionId1, PersistentOpts),
ClientPersistent2 = client(SessionId2, PersistentOpts),
_ = [
?assertMatch({ok, _, _}, emqtt:subscribe(Client, Topic))
|| {Client, Topics} <- [
{Client1, [<<"t/client/mem">>, <<"t/+">>]},
{Client2, [<<"t/client/mem">>, <<"t/+">>]},
{ClientPersistent1, [<<"t/persistent/#">>, <<"t/client/ps">>, <<"t/+">>]},
{ClientPersistent2, [<<"t/persistent/#">>, <<"t/client/ps">>, <<"t/+">>]}
],
Topic <- Topics
],
Matched = request_json(get, ["topics"]),
?assertMatch(
#{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 8},
maps:get(<<"meta">>, Matched)
),
%% Get back both topics for both persistent and in-memory subscriptions.
Expected = [
#{<<"topic">> => <<"t/+">>, <<"node">> => atom_to_binary(node())},
#{<<"topic">> => <<"t/+">>, <<"session">> => SessionId1},
#{<<"topic">> => <<"t/+">>, <<"session">> => SessionId2},
#{<<"topic">> => <<"t/client/mem">>, <<"node">> => atom_to_binary(node())},
#{<<"topic">> => <<"t/client/ps">>, <<"session">> => SessionId1},
#{<<"topic">> => <<"t/client/ps">>, <<"session">> => SessionId2},
#{<<"topic">> => <<"t/persistent/#">>, <<"session">> => SessionId1},
#{<<"topic">> => <<"t/persistent/#">>, <<"session">> => SessionId2}
],
?assertEqual(
lists:sort(Expected),
lists:sort(maps:get(<<"data">>, Matched))
),
%% Are results the same when paginating?
#{<<"data">> := Page1} = R1 = request_json(get, ["topics"], [{"page", "1"}, {"limit", "3"}]),
#{<<"data">> := Page2} = request_json(get, ["topics"], [{"page", "2"}, {"limit", "3"}]),
#{<<"data">> := Page3} = request_json(get, ["topics"], [{"page", "3"}, {"limit", "3"}]),
?assertEqual(
lists:sort(Expected),
lists:sort(Page1 ++ Page2 ++ Page3)
),
%% Count respects persistent sessions.
?assertMatch(
#{
<<"meta">> := #{<<"page">> := 1, <<"limit">> := 3, <<"count">> := 8},
<<"data">> := [_, _, _]
},
R1
),
%% Filtering by node makes no sense for persistent sessions.
?assertMatch(
#{
<<"data">> := [
#{<<"topic">> := <<"t/client/mem">>, <<"node">> := _},
#{<<"topic">> := <<"t/+">>, <<"node">> := _}
],
<<"meta">> := #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 2}
},
request_json(get, ["topics"], [{"node", atom_to_list(node())}])
).
%% Utilities
client(Name) ->
{ok, Client} = emqtt:start_link(#{
username => emqx_utils_conv:bin(Name),
clientid => emqx_utils_conv:bin(Name)
}),
client(Name, #{}).
client(Name, Overrides) ->
{ok, Client} = emqtt:start_link(
maps:merge(
#{
username => emqx_utils_conv:bin(Name),
clientid => emqx_utils_conv:bin(Name)
},
Overrides
)
),
{ok, _} = emqtt:connect(Client),
Client.

View File

@ -124,9 +124,12 @@ ensure_otel_metrics(
) ->
ok;
ensure_otel_metrics(#{metrics := #{enable := true}} = Conf, _Old) ->
ok = emqx_otel_cpu_sup:stop_otel_cpu_sup(),
_ = emqx_otel_cpu_sup:start_otel_cpu_sup(Conf),
_ = emqx_otel_metrics:stop_otel(),
emqx_otel_metrics:start_otel(Conf);
ensure_otel_metrics(#{metrics := #{enable := false}}, _Old) ->
ok = emqx_otel_cpu_sup:stop_otel_cpu_sup(),
emqx_otel_metrics:stop_otel();
ensure_otel_metrics(_, _) ->
ok.

View File

@ -0,0 +1,146 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_otel_cpu_sup).
-behaviour(gen_server).
-include_lib("emqx/include/logger.hrl").
%% gen_server APIs
-export([start_link/1]).
-export([
start_otel_cpu_sup/1,
stop_otel_cpu_sup/0,
stats/1
]).
%% gen_server callbacks
-export([
init/1,
handle_continue/2,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-define(REFRESH, refresh).
-define(OTEL_CPU_USAGE_WORKER, ?MODULE).
-define(SUPERVISOR, emqx_otel_sup).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
start_otel_cpu_sup(Conf) ->
Spec = emqx_otel_sup:worker_spec(?MODULE, Conf),
assert_started(supervisor:start_child(?SUPERVISOR, Spec)).
stop_otel_cpu_sup() ->
case erlang:whereis(?SUPERVISOR) of
undefined ->
ok;
Pid ->
case supervisor:terminate_child(Pid, ?MODULE) of
ok -> supervisor:delete_child(Pid, ?MODULE);
{error, not_found} -> ok;
Error -> Error
end
end.
stats(Name) ->
gen_server:call(?OTEL_CPU_USAGE_WORKER, {?FUNCTION_NAME, Name}, infinity).
%%--------------------------------------------------------------------
%% gen_server callbacks
%% simply handle cpu_sup:util/0,1 called in one process
%%--------------------------------------------------------------------
start_link(Conf) ->
gen_server:start_link({local, ?OTEL_CPU_USAGE_WORKER}, ?MODULE, Conf, []).
init(Conf) ->
{ok, _InitState = #{}, {continue, {setup, Conf}}}.
%% Interval in milliseconds
handle_continue({setup, #{metrics := #{enable := true, interval := Interval}}}, State) ->
%% start os_mon temporarily
{ok, _} = application:ensure_all_started(os_mon),
%% The returned value of the first call to cpu_sup:util/0 or cpu_sup:util/1 by a
%% process will on most systems be the CPU utilization since system boot,
%% but this is not guaranteed and the value should therefore be regarded as garbage.
%% This also applies to the first call after a restart of cpu_sup.
_Val = cpu_sup:util(),
TRef = start_refresh_timer(Interval),
{noreply, State#{interval => Interval, refresh_time_ref => TRef}}.
handle_call({stats, Name}, _From, State) ->
{reply, get_stats(Name, State), State};
handle_call(stop, _From, State) ->
cancel_outdated_timer(State),
{stop, normal, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}.
handle_info({timeout, _Timer, ?REFRESH}, State) ->
{noreply, refresh(State)}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
refresh(#{interval := Interval} = State) ->
NState =
case cpu_sup:util([]) of
{all, U, I, _} ->
State#{'cpu.use' => U, 'cpu.idle' => I};
_ ->
State#{'cpu.use' => 0, 'cpu.idle' => 0}
end,
TRef = start_refresh_timer(Interval),
NState#{refresh_time_ref => TRef}.
get_stats(Name, State) ->
maps:get(Name, State, 0).
cancel_outdated_timer(#{refresh_time_ref := TRef}) ->
emqx_utils:cancel_timer(TRef),
ok.
start_refresh_timer(Interval) ->
start_timer(Interval, ?REFRESH).
start_timer(Interval, Msg) ->
emqx_utils:start_timer(Interval, Msg).
assert_started({ok, _Pid}) -> ok;
assert_started({ok, _Pid, _Info}) -> ok;
assert_started({error, {already_started, _Pid}}) -> ok;
assert_started({error, Reason}) -> {error, Reason}.

View File

@ -197,6 +197,10 @@ bytes_metrics() ->
get_stats_gauge(Name) ->
[{emqx_stats:getstat(Name), #{}}].
get_vm_gauge('cpu.use') ->
[{emqx_otel_cpu_sup:stats('cpu.use'), #{}}];
get_vm_gauge('cpu.idle') ->
[{emqx_otel_cpu_sup:stats('cpu.idle'), #{}}];
get_vm_gauge(Name) ->
[{emqx_mgmt:vm_stats(Name), #{}}].
@ -254,8 +258,6 @@ create_counter(Meter, Counters, CallBack) ->
Counters
).
%% Note: list_to_existing_atom("cpu.use") will crash
%% so we make sure the atom is already existing here
normalize_name(cpu_use) ->
'cpu.use';
normalize_name(cpu_idle) ->

View File

@ -42,7 +42,12 @@ init([]) ->
},
Children =
case emqx_conf:get([opentelemetry]) of
#{metrics := #{enable := false}} -> [];
#{metrics := #{enable := true}} = Conf -> [worker_spec(emqx_otel_metrics, Conf)]
#{metrics := #{enable := false}} ->
[];
#{metrics := #{enable := true}} = Conf ->
[
worker_spec(emqx_otel_metrics, Conf),
worker_spec(emqx_otel_cpu_sup, Conf)
]
end,
{ok, {SupFlags, Children}}.

View File

@ -1,6 +1,6 @@
{application, emqx_oracle, [
{description, "EMQX Enterprise Oracle Database Connector"},
{vsn, "0.1.9"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_postgresql, [
{description, "EMQX PostgreSQL Database Connector"},
{vsn, "0.1.3"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,

View File

@ -2,7 +2,7 @@
{application, emqx_prometheus, [
{description, "Prometheus for EMQX"},
% strict semver, bump manually!
{vsn, "5.1.0"},
{vsn, "5.2.0"},
{modules, []},
{registered, [emqx_prometheus_sup]},
{applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_resource, emqx_management]},

View File

@ -2,7 +2,7 @@
{application, emqx_rule_engine, [
{description, "EMQX Rule Engine"},
% strict semver, bump manually!
{vsn, "5.0.33"},
{vsn, "5.1.0"},
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [

View File

@ -6,7 +6,7 @@
{emqx_utils, {path, "../emqx_utils"}},
{emqx_rule_engine, {path, "../emqx_rule_engine"}},
{erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}},
{jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.7.12"}}},
{jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.8.0"}}},
{gpb, "4.19.9"}
]}.

View File

@ -1,6 +1,6 @@
{application, emqx_schema_registry, [
{description, "EMQX Schema Registry"},
{vsn, "0.2.0"},
{vsn, "0.3.0"},
{registered, [emqx_schema_registry_sup]},
{mod, {emqx_schema_registry_app, []}},
{included_applications, [

View File

@ -37,6 +37,9 @@
%% Timestamp (Unit: millisecond)
timestamp :: integer(),
%% Miscellaneous extensions, currently used for OpenTelemetry context propagation
%% and storing mqueue/inflight insertion timestamps.
%% It was not used prior to 5.4.0 and defaulted to an empty list.
%% Must be a map now.
extra = #{} :: term()
}).

View File

@ -2,7 +2,7 @@
{application, emqx_utils, [
{description, "Miscellaneous utilities for EMQX apps"},
% strict semver, bump manually!
{vsn, "5.1.0"},
{vsn, "5.2.0"},
{modules, [
emqx_utils,
emqx_utils_api,

View File

@ -1,6 +1,7 @@
Implement log throttling. The feature reduces the number of potentially flooding logged events by
dropping all but the first event within a configured time window.
Throttling is applied to the following log events:
- authentication_failure,
- authorization_permission_denied,
- cannot_publish_to_topic_due_to_not_authorized,
- cannot_publish_to_topic_due_to_quota_exceeded,

View File

@ -1,21 +1,20 @@
Implement HTTP APIs to get the list of client's inflight and mqueue messages.
Implement HTTP APIs to get the list of client's in-flight and mqueue messages.
To get the first chunk of data:
- GET /clients/{clientid}/mqueue_messages?limit=100
- GET /clients/{clientid}/inflight_messages?limit=100
Alternatively:
- GET /clients/{clientid}/mqueue_messages?limit=100&after=none
- GET /clients/{clientid}/inflight_messages?limit=100&after=none
- GET /clients/{clientid}/mqueue_messages?limit=100&position=none
- GET /clients/{clientid}/inflight_messages?limit=100&position=none
To get the next chunk of data:
- GET /clients/{clientid}/mqueue_messages?limit=100&after={last}
- GET /clients/{clientid}/inflight_messages?limit=100&after={last}
- GET /clients/{clientid}/mqueue_messages?limit=100&position={position}
- GET /clients/{clientid}/inflight_messages?limit=100&position={position}
Where {last} is a value (opaque string token) of "meta.last" field from the previous response.
Where {position} is a value (opaque string token) of "meta.position" field from the previous response.
If there is no more data, "last" = "end_of_data" is returned.
If a subsequent request is attempted with "after=end_of_data", a "400 Bad Request" error response will be received.
Mqueue messages are ordered according to their priority and queue (FIFO) order: from higher priority to lower priority.
By default, all messages in Mqueue have the same priority of 0.
Mqueue messages are ordered according to the queue (FIFO) order.
Inflight messages are ordered by MQTT Packet Id, which may not represent the chronological messages order.
In-flight messages are ordered by time at which they were inserted to the in-flight storage (from older to newer messages).

View File

@ -0,0 +1,3 @@
Add `username` log field.
If MQTT client is connected with a non-empty username the logs and traces will include `username` field.

Some files were not shown because too many files have changed in this diff Show More