Merge remote-tracking branch 'origin/release-57'

This commit is contained in:
ieQu1 2024-05-21 15:04:46 +02:00
commit acb19a06cf
93 changed files with 1686 additions and 761 deletions

View File

@ -0,0 +1,29 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_PERSISTENT_MESSAGE_HRL).
-define(EMQX_PERSISTENT_MESSAGE_HRL, true).
-define(PERSISTENT_MESSAGE_DB, messages).
-define(PERSISTENCE_ENABLED, emqx_message_persistence_enabled).
-define(WITH_DURABILITY_ENABLED(DO),
case is_persistence_enabled() of
true -> DO;
false -> {skipped, disabled}
end
).
-endif.

View File

@ -56,8 +56,8 @@ init_per_testcase(t_session_gc = TestCase, Config) ->
n => 3,
roles => [core, core, core],
extra_emqx_conf =>
"\n session_persistence {"
"\n last_alive_update_interval = 500ms "
"\n durable_sessions {"
"\n heartbeat_interval = 500ms "
"\n session_gc_interval = 1s "
"\n session_gc_batch_size = 2 "
"\n }"
@ -116,7 +116,7 @@ app_specs() ->
app_specs(_Opts = #{}).
app_specs(Opts) ->
DefaultEMQXConf = "session_persistence {enable = true, renew_streams_interval = 1s}",
DefaultEMQXConf = "durable_sessions {enable = true, renew_streams_interval = 1s}",
ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
[
{emqx, DefaultEMQXConf ++ ExtraEMQXConf}

View File

@ -111,6 +111,11 @@ reclaim_seq(Topic) ->
stats_fun() ->
safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'),
safe_update_stats(subscription_count(), 'subscriptions.count', 'subscriptions.max'),
safe_update_stats(
durable_subscription_count(),
'durable_subscriptions.count',
'durable_subscriptions.max'
),
safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max').
safe_update_stats(undefined, _Stat, _MaxStat) ->
@ -118,15 +123,13 @@ safe_update_stats(undefined, _Stat, _MaxStat) ->
safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) ->
emqx_stats:setstat(Stat, MaxStat, Val).
%% N.B.: subscriptions from durable sessions are not tied to any particular node.
%% Therefore, do not sum them with node-local subscriptions.
subscription_count() ->
NonPSCount = table_size(?SUBSCRIPTION),
PSCount = emqx_persistent_session_bookkeeper:get_subscription_count(),
case is_integer(NonPSCount) of
true ->
NonPSCount + PSCount;
false ->
PSCount
end.
table_size(?SUBSCRIPTION).
durable_subscription_count() ->
emqx_persistent_session_bookkeeper:get_subscription_count().
subscriber_val() ->
sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)).

View File

@ -126,7 +126,7 @@ fields(builtin) ->
sc(
pos_integer(),
#{
default => 16,
default => 12,
importance => ?IMPORTANCE_MEDIUM,
desc => ?DESC(builtin_n_shards)
}

View File

@ -76,7 +76,9 @@ best_effort_json_obj(Input) ->
best_effort_json_obj(Input, Config).
-spec format(logger:log_event(), config()) -> iodata().
format(#{level := Level, msg := Msg, meta := Meta}, Config0) when is_map(Config0) ->
format(#{level := _Level, msg := _Msg, meta := _Meta} = Entry, Config0) when is_map(Config0) ->
#{level := Level, msg := Msg, meta := Meta} =
emqx_logger_textfmt:evaluate_lazy_values_if_dbg_level(Entry),
Config = add_default_config(Config0),
[format(Msg, Meta#{level => Level}, Config), "\n"].
@ -219,7 +221,7 @@ best_effort_unicode(Input, Config) ->
best_effort_json_obj(List, Config) when is_list(List) ->
try
json_obj(maps:from_list(List), Config)
json_obj(convert_tuple_list_to_map(List), Config)
catch
_:_ ->
[json(I, Config) || I <- List]
@ -232,6 +234,16 @@ best_effort_json_obj(Map, Config) ->
do_format_msg("~p", [Map], Config)
end.
%% This function will throw if the list do not only contain tuples or if there
%% are duplicate keys.
convert_tuple_list_to_map(List) ->
%% Crash if this is not a tuple list
CandidateMap = maps:from_list(List),
%% Crash if there are duplicates
NumberOfItems = length(List),
NumberOfItems = maps:size(CandidateMap),
CandidateMap.
json(A, _) when is_atom(A) -> A;
json(I, _) when is_integer(I) -> I;
json(F, _) when is_float(F) -> F;

View File

@ -16,15 +16,20 @@
-module(emqx_logger_textfmt).
-include("emqx_trace.hrl").
-export([format/2]).
-export([check_config/1]).
-export([try_format_unicode/1]).
%% Used in the other log formatters
-export([evaluate_lazy_values_if_dbg_level/1, evaluate_lazy_values/1]).
check_config(X) -> logger_formatter:check_config(maps:without([timestamp_format], X)).
%% Principle here is to delegate the formatting to logger_formatter:format/2
%% as much as possible, and only enrich the report with clientid, peername, topic, username
format(#{msg := {report, ReportMap}, meta := Meta} = Event, Config) when is_map(ReportMap) ->
format(#{msg := {report, ReportMap0}, meta := _Meta} = Event0, Config) when is_map(ReportMap0) ->
#{msg := {report, ReportMap}, meta := Meta} = Event = evaluate_lazy_values_if_dbg_level(Event0),
%% The most common case, when entering from SLOG macro
%% i.e. logger:log(Level, #{msg => "my_msg", foo => bar})
ReportList = enrich_report(ReportMap, Meta),
@ -40,12 +45,39 @@ format(#{msg := {string, String}} = Event, Config) ->
%% copied from logger_formatter:format/2
%% unsure how this case is triggered
format(Event#{msg => {"~ts ", [String]}}, Config);
format(#{msg := Msg0, meta := Meta} = Event, Config) ->
format(#{msg := _Msg, meta := _Meta} = Event0, Config) ->
#{msg := Msg0, meta := Meta} = Event1 = evaluate_lazy_values_if_dbg_level(Event0),
%% For format strings like logger:log(Level, "~p", [Var])
%% and logger:log(Level, "message", #{key => value})
Msg1 = enrich_client_info(Msg0, Meta),
Msg2 = enrich_topic(Msg1, Meta),
fmt(Event#{msg := Msg2}, Config).
fmt(Event1#{msg := Msg2}, Config).
%% Most log entries with lazy values are trace events with level debug. So to
%% be more efficient we only search for lazy values to evaluate in the entries
%% with level debug in the main log formatters.
evaluate_lazy_values_if_dbg_level(#{level := debug} = Map) ->
evaluate_lazy_values(Map);
evaluate_lazy_values_if_dbg_level(Map) ->
Map.
evaluate_lazy_values(Map) when is_map(Map) ->
maps:map(fun evaluate_lazy_values_kv/2, Map);
evaluate_lazy_values({report, Report}) ->
{report, evaluate_lazy_values(Report)};
evaluate_lazy_values(V) ->
V.
evaluate_lazy_values_kv(_K, #emqx_trace_format_func_data{function = Formatter, data = V}) ->
try
NewV = Formatter(V),
evaluate_lazy_values(NewV)
catch
_:_ ->
V
end;
evaluate_lazy_values_kv(_K, V) ->
evaluate_lazy_values(V).
fmt(#{meta := #{time := Ts}} = Data, Config) ->
Timestamp =

View File

@ -32,15 +32,7 @@
persist/1
]).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-define(PERSISTENCE_ENABLED, emqx_message_persistence_enabled).
-define(WHEN_ENABLED(DO),
case is_persistence_enabled() of
true -> DO;
false -> {skipped, disabled}
end
).
-include("emqx_persistent_message.hrl").
%%--------------------------------------------------------------------
@ -51,7 +43,7 @@ init() ->
Zones = maps:keys(emqx_config:get([zones])),
IsEnabled = lists:any(fun is_persistence_enabled/1, Zones),
persistent_term:put(?PERSISTENCE_ENABLED, IsEnabled),
?WHEN_ENABLED(begin
?WITH_DURABILITY_ENABLED(begin
?SLOG(notice, #{msg => "Session durability is enabled"}),
Backend = storage_backend(),
ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, Backend),
@ -66,7 +58,7 @@ is_persistence_enabled() ->
-spec is_persistence_enabled(emqx_types:zone()) -> boolean().
is_persistence_enabled(Zone) ->
emqx_config:get_zone_conf(Zone, [session_persistence, enable]).
emqx_config:get_zone_conf(Zone, [durable_sessions, enable]).
-spec storage_backend() -> emqx_ds:create_db_opts().
storage_backend() ->
@ -76,7 +68,7 @@ storage_backend() ->
%% `emqx_persistent_session_ds':
-spec force_ds(emqx_types:zone()) -> boolean().
force_ds(Zone) ->
emqx_config:get_zone_conf(Zone, [session_persistence, force_persistence]).
emqx_config:get_zone_conf(Zone, [durable_sessions, force_persistence]).
storage_backend(Path) ->
ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path),
@ -86,12 +78,12 @@ storage_backend(Path) ->
-spec add_handler() -> ok.
add_handler() ->
emqx_config_handler:add_handler([session_persistence], ?MODULE).
emqx_config_handler:add_handler([durable_sessions], ?MODULE).
pre_config_update([session_persistence], #{<<"enable">> := New}, #{<<"enable">> := Old}) when
pre_config_update([durable_sessions], #{<<"enable">> := New}, #{<<"enable">> := Old}) when
New =/= Old
->
{error, "Hot update of session_persistence.enable parameter is currently not supported"};
{error, "Hot update of durable_sessions.enable parameter is currently not supported"};
pre_config_update(_Root, _NewConf, _OldConf) ->
ok.
@ -100,7 +92,7 @@ pre_config_update(_Root, _NewConf, _OldConf) ->
-spec persist(emqx_types:message()) ->
emqx_ds:store_batch_result() | {skipped, needs_no_persistence}.
persist(Msg) ->
?WHEN_ENABLED(
?WITH_DURABILITY_ENABLED(
case needs_persistence(Msg) andalso has_subscribers(Msg) of
true ->
store_message(Msg);

View File

@ -81,7 +81,7 @@ handle_info(_Info, State) ->
%%--------------------------------------------------------------------------------
ensure_gc_timer() ->
Timeout = emqx_config:get([session_persistence, message_retention_period]),
Timeout = emqx_config:get([durable_sessions, message_retention_period]),
_ = erlang:send_after(Timeout, self(), #gc{}),
ok.
@ -114,7 +114,7 @@ now_ms() ->
maybe_gc() ->
AllGens = emqx_ds:list_generations_with_lifetimes(?PERSISTENT_MESSAGE_DB),
NowMS = now_ms(),
RetentionPeriod = emqx_config:get([session_persistence, message_retention_period]),
RetentionPeriod = emqx_config:get([durable_sessions, message_retention_period]),
TimeThreshold = NowMS - RetentionPeriod,
maybe_create_new_generation(AllGens, TimeThreshold),
?tp_span(

View File

@ -102,6 +102,6 @@ tally_persistent_subscriptions(State0) ->
State0#{subs_count := N}.
ensure_subs_tally_timer() ->
Timeout = emqx_config:get([session_persistence, subscription_count_refresh_interval]),
Timeout = emqx_config:get([durable_sessions, subscription_count_refresh_interval]),
_ = erlang:send_after(Timeout, self(), #tally_subs{}),
ok.

View File

@ -1146,14 +1146,14 @@ receive_maximum(ConnInfo) ->
expiry_interval(ConnInfo) ->
maps:get(expiry_interval, ConnInfo, 0).
%% Note: we don't allow overriding `last_alive_update_interval' per
%% Note: we don't allow overriding `heartbeat_interval' per
%% zone, since the GC process is responsible for all sessions
%% regardless of the zone.
bump_interval() ->
emqx_config:get([session_persistence, last_alive_update_interval]).
emqx_config:get([durable_sessions, heartbeat_interval]).
get_config(#{zone := Zone}, Key) ->
emqx_config:get_zone_conf(Zone, [session_persistence | Key]).
emqx_config:get_zone_conf(Zone, [durable_sessions | Key]).
-spec try_get_live_session(emqx_types:clientid()) ->
{pid(), session()} | not_found | not_persistent.
@ -1182,7 +1182,12 @@ maybe_set_offline_info(S, Id) ->
case emqx_cm:lookup_client({clientid, Id}) of
[{_Key, ChannelInfo, Stats}] ->
emqx_persistent_session_ds_state:set_offline_info(
#{chan_info => ChannelInfo, stats => Stats},
#{
chan_info => ChannelInfo,
stats => Stats,
disconnected_at => erlang:system_time(millisecond),
last_connected_to => node()
},
S
);
_ ->

View File

@ -16,7 +16,7 @@
-ifndef(EMQX_PERSISTENT_SESSION_DS_HRL_HRL).
-define(EMQX_PERSISTENT_SESSION_DS_HRL_HRL, true).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-include("emqx_persistent_message.hrl").
-define(SESSION_TAB, emqx_ds_session).
-define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).

View File

@ -93,7 +93,7 @@ handle_info(_Info, State) ->
%%--------------------------------------------------------------------------------
ensure_gc_timer() ->
Timeout = emqx_config:get([session_persistence, session_gc_interval]),
Timeout = emqx_config:get([durable_sessions, session_gc_interval]),
_ = erlang:send_after(Timeout, self(), #gc{}),
ok.
@ -133,8 +133,8 @@ start_gc() ->
).
gc_context() ->
GCInterval = emqx_config:get([session_persistence, session_gc_interval]),
BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
GCInterval = emqx_config:get([durable_sessions, session_gc_interval]),
BumpInterval = emqx_config:get([durable_sessions, heartbeat_interval]),
TimeThreshold = max(GCInterval, BumpInterval) * 3,
NowMS = now_ms(),
#{
@ -149,7 +149,7 @@ gc_context() ->
}.
gc_loop(MinLastAlive, MinLastAliveWillMsg, It0) ->
GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]),
GCBatchSize = emqx_config:get([durable_sessions, session_gc_batch_size]),
case emqx_persistent_session_ds_state:session_iterator_next(It0, GCBatchSize) of
{[], _It} ->
ok;

View File

@ -303,9 +303,9 @@ roots(low) ->
converter => fun flapping_detect_converter/2
}
)},
{session_persistence,
{durable_sessions,
sc(
ref("session_persistence"),
ref("durable_sessions"),
#{
importance => ?IMPORTANCE_HIDDEN
}
@ -1652,12 +1652,12 @@ fields("trace") ->
desc => ?DESC(fields_trace_payload_encode)
})}
];
fields("session_persistence") ->
fields("durable_sessions") ->
[
{"enable",
sc(
boolean(), #{
desc => ?DESC(session_persistence_enable),
desc => ?DESC(durable_sessions_enable),
default => false
}
)},
@ -1678,12 +1678,12 @@ fields("session_persistence") ->
desc => ?DESC(session_ds_idle_poll_interval)
}
)},
{"last_alive_update_interval",
{"heartbeat_interval",
sc(
timeout_duration(),
#{
default => <<"5000ms">>,
desc => ?DESC(session_ds_last_alive_update_interval)
desc => ?DESC(session_ds_heartbeat_interval)
}
)},
{"renew_streams_interval",
@ -2091,7 +2091,7 @@ desc("ocsp") ->
"Per listener OCSP Stapling configuration.";
desc("crl_cache") ->
"Global CRL cache options.";
desc("session_persistence") ->
desc("durable_sessions") ->
"Settings governing durable sessions persistence.";
desc(durable_storage) ->
?DESC(durable_storage);

View File

@ -109,6 +109,8 @@
%% PubSub stats
-define(PUBSUB_STATS, [
'durable_subscriptions.count',
'durable_subscriptions.max',
'topics.count',
'topics.max',
'suboptions.count',
@ -166,6 +168,8 @@ names() ->
[
emqx_connections_count,
emqx_connections_max,
emqx_durable_subscriptions_count,
emqx_durable_subscriptions_max,
emqx_live_connections_count,
emqx_live_connections_max,
emqx_cluster_sessions_count,

View File

@ -15,11 +15,9 @@
%%--------------------------------------------------------------------
-module(emqx_trace_formatter).
-include("emqx_mqtt.hrl").
-include("emqx_trace.hrl").
-export([format/2]).
-export([format_meta_map/1]).
-export([evaluate_lazy_values/1]).
%% logger_formatter:config/0 is not exported.
-type config() :: map().
@ -30,35 +28,20 @@
LogEvent :: logger:log_event(),
Config :: config().
format(
#{level := debug, meta := Meta0 = #{trace_tag := Tag}, msg := Msg},
#{level := debug, meta := _Meta = #{trace_tag := _Tag}, msg := _Msg} = Entry,
#{payload_encode := PEncode}
) ->
Meta1 = evaluate_lazy_values(Meta0),
#{level := debug, meta := Meta = #{trace_tag := Tag}, msg := Msg} =
emqx_logger_textfmt:evaluate_lazy_values(Entry),
Time = emqx_utils_calendar:now_to_rfc3339(microsecond),
ClientId = to_iolist(maps:get(clientid, Meta1, "")),
Peername = maps:get(peername, Meta1, ""),
MetaBin = format_meta(Meta1, PEncode),
ClientId = to_iolist(maps:get(clientid, Meta, "")),
Peername = maps:get(peername, Meta, ""),
MetaBin = format_meta(Meta, PEncode),
Msg1 = to_iolist(Msg),
Tag1 = to_iolist(Tag),
[Time, " [", Tag1, "] ", ClientId, "@", Peername, " msg: ", Msg1, ", ", MetaBin, "\n"];
format(Event, Config) ->
emqx_logger_textfmt:format(evaluate_lazy_values(Event), Config).
evaluate_lazy_values(Map) when is_map(Map) ->
maps:map(fun evaluate_lazy_values_kv/2, Map);
evaluate_lazy_values(V) ->
V.
evaluate_lazy_values_kv(_K, #emqx_trace_format_func_data{function = Formatter, data = V}) ->
try
NewV = Formatter(V),
evaluate_lazy_values(NewV)
catch
_:_ ->
V
end;
evaluate_lazy_values_kv(_K, V) ->
evaluate_lazy_values(V).
emqx_logger_textfmt:format(Event, Config).
format_meta_map(Meta) ->
Encode = emqx_trace_handler:payload_encode(),

View File

@ -36,8 +36,8 @@ format(
LogMap,
#{payload_encode := PEncode} = Config
) ->
LogMap0 = maybe_format_msg(LogMap, Config),
LogMap1 = emqx_trace_formatter:evaluate_lazy_values(LogMap0),
LogMap0 = emqx_logger_textfmt:evaluate_lazy_values(LogMap),
LogMap1 = maybe_format_msg(LogMap0, Config),
%% We just make some basic transformations on the input LogMap and then do
%% an external call to create the JSON text
Time = emqx_utils_calendar:now_to_rfc3339(microsecond),

View File

@ -34,7 +34,7 @@ roots() ->
conn_congestion,
force_gc,
overload_protection,
session_persistence
durable_sessions
].
zones_without_default() ->

View File

@ -580,7 +580,7 @@ t_handle_out_publish_1(_) ->
{ok, {outgoing, [?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)]}, _Chan} =
emqx_channel:handle_out(publish, [{1, Msg}], channel()).
t_handle_out_connack_sucess(_) ->
t_handle_out_connack_success(_) ->
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =
emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, #{}}, channel()),
?assertEqual(connected, emqx_channel:info(conn_state, Channel)).

View File

@ -382,7 +382,7 @@ t_init_zone_with_global_defaults(Config) when is_list(Config) ->
%% when put zones with global default with emqx_config:put/1
GlobalDefaults = zone_global_defaults(),
AllConf = maps:put(zones, Zones, GlobalDefaults),
%% Then put sucess
%% Then put success
?assertEqual(ok, emqx_config:put(AllConf)),
%% Then GlobalDefaults are set
?assertEqual(GlobalDefaults, maps:with(maps:keys(GlobalDefaults), emqx_config:get([]))),
@ -465,13 +465,13 @@ zone_global_defaults() ->
enable => false
},
stats => #{enable => true},
session_persistence =>
durable_sessions =>
#{
enable => false,
batch_size => 100,
force_persistence => false,
idle_poll_interval => 100,
last_alive_update_interval => 5000,
heartbeat_interval => 5000,
message_retention_period => 86400000,
renew_streams_interval => 5000,
session_gc_batch_size => 100,

View File

@ -0,0 +1,90 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_logger_fmt_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include("emqx_trace.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, Config) ->
Config.
t_text_fmt_lazy_values(_) ->
check_fmt_lazy_values(emqx_logger_textfmt).
t_text_fmt_lazy_values_only_in_debug_level_events(_) ->
check_fmt_lazy_values_only_in_debug_level_events(emqx_logger_textfmt).
t_json_fmt_lazy_values(_) ->
check_fmt_lazy_values(emqx_logger_jsonfmt).
t_json_fmt_lazy_values_only_in_debug_level_events(_) ->
check_fmt_lazy_values_only_in_debug_level_events(emqx_logger_jsonfmt).
check_fmt_lazy_values(FormatModule) ->
LogEntryIOData = FormatModule:format(event_with_lazy_value(), conf()),
LogEntryBin = unicode:characters_to_binary(LogEntryIOData),
%% Result of lazy evealuation should exist
?assertNotEqual(nomatch, binary:match(LogEntryBin, [<<"hej">>])),
%% The lazy value should have been evaluated
?assertEqual(nomatch, binary:match(LogEntryBin, [<<"emqx_trace_format_func_data">>])),
ok.
check_fmt_lazy_values_only_in_debug_level_events(FormatModule) ->
%% For performace reason we only search for lazy values to evaluate if log level is debug
WarningEvent = (event_with_lazy_value())#{level => info},
LogEntryIOData = FormatModule:format(WarningEvent, conf()),
LogEntryBin = unicode:characters_to_binary(LogEntryIOData),
%% The input data for the formatting should exist
?assertNotEqual(nomatch, binary:match(LogEntryBin, [<<"hej">>])),
%% The lazy value should not have been evaluated
?assertNotEqual(nomatch, binary:match(LogEntryBin, [<<"emqx_trace_format_func_data">>])),
ok.
conf() ->
#{
time_offset => [],
chars_limit => unlimited,
depth => 100,
single_line => true,
template => ["[", level, "] ", msg, "\n"],
timestamp_format => auto
}.
event_with_lazy_value() ->
#{
meta => #{
pid => what,
time => 1715763862274127,
gl => what,
report_cb => fun logger:format_otp_report/1
},
msg =>
{report, #{
reason =>
#emqx_trace_format_func_data{function = fun(Data) -> Data end, data = hej},
msg => hej
}},
level => debug
}.

View File

@ -27,7 +27,7 @@
-import(emqx_common_test_helpers, [on_exit/1]).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-include("emqx_persistent_message.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -46,7 +46,7 @@ init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
init_per_testcase(t_message_gc = TestCase, Config) ->
Opts = #{
extra_emqx_conf =>
"\n session_persistence.message_retention_period = 3s"
"\n durable_sessions.message_retention_period = 3s"
"\n durable_storage.messages.n_shards = 3"
},
common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
@ -554,7 +554,7 @@ app_specs(Opts) ->
ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
[
emqx_durable_storage,
{emqx, "session_persistence {enable = true}" ++ ExtraEMQXConf}
{emqx, "durable_sessions {enable = true}" ++ ExtraEMQXConf}
].
cluster() ->

View File

@ -26,7 +26,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-include("emqx_persistent_message.hrl").
-define(EMQX_CONFIG, "sys_topics.sys_heartbeat_interval = 1s\n").
%%--------------------------------------------------------------------
@ -67,7 +68,7 @@ groups() ->
init_per_group(persistence_disabled, Config) ->
[
{emqx_config, ?EMQX_CONFIG ++ "session_persistence { enable = false }"},
{emqx_config, ?EMQX_CONFIG ++ "durable_sessions { enable = false }"},
{persistence, false}
| Config
];
@ -75,9 +76,9 @@ init_per_group(persistence_enabled, Config) ->
[
{emqx_config,
?EMQX_CONFIG ++
"session_persistence {\n"
"durable_sessions {\n"
" enable = true\n"
" last_alive_update_interval = 100ms\n"
" heartbeat_interval = 100ms\n"
" renew_streams_interval = 100ms\n"
" session_gc_interval = 2s\n"
"}"},

View File

@ -38,7 +38,7 @@ init_per_suite(Config) ->
AppSpecs = [
emqx_durable_storage,
{emqx, #{
config => #{session_persistence => #{enable => true}},
config => #{durable_sessions => #{enable => true}},
override_env => [{boot_modules, [broker]}]
}}
],

View File

@ -69,9 +69,9 @@ init_per_group(persistence_enabled = Group, Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
"session_persistence = {\n"
"durable_sessions = {\n"
" enable = true\n"
" last_alive_update_interval = 100ms\n"
" heartbeat_interval = 100ms\n"
" renew_streams_interval = 100ms\n"
" session_gc_interval = 2s\n"
"}\n"}
@ -85,7 +85,7 @@ init_per_group(persistence_enabled = Group, Config) ->
];
init_per_group(persistence_disabled = Group, Config) ->
Apps = emqx_cth_suite:start(
[{emqx, "session_persistence.enable = false"}],
[{emqx, "durable_sessions.enable = false"}],
#{work_dir => emqx_cth_suite:work_dir(Group, Config)}
),
[

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_http, [
{description, "EMQX External HTTP API Authentication and Authorization"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, []},
{mod, {emqx_auth_http_app, []}},
{applications, [

View File

@ -118,8 +118,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_pulsar_action_info,
emqx_bridge_greptimedb_action_info,
emqx_bridge_tdengine_action_info,
emqx_bridge_s3_upload_action_info,
emqx_bridge_s3_aggreg_upload_action_info
emqx_bridge_s3_upload_action_info
].
-else.
hard_coded_action_info_modules_ee() ->

View File

@ -12,6 +12,7 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx_trace.hrl").
%% schema
-export([roots/0, fields/1, desc/1, namespace/0]).
@ -273,11 +274,14 @@ do_batch_query(InstId, Requests, Async, #{pool_name := PoolName} = State) ->
_ -> none
end,
emqx_trace:rendered_action_template(ChannelID, #{
cqls => CQLs
cqls => #emqx_trace_format_func_data{data = CQLs, function = fun trace_format_cql_tuples/1}
}),
Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs),
handle_result(Res).
trace_format_cql_tuples(Tuples) ->
[CQL || {_, CQL} <- Tuples].
parse_request_to_cql({query, CQL}) ->
{query, CQL, #{}};
parse_request_to_cql({query, CQL, Params}) ->

View File

@ -422,7 +422,7 @@ is_auth_key(_) ->
%% -------------------------------------------------------------------------------------------------
%% Query
do_query(InstId, Channel, Client, Points) ->
emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => false}),
emqx_trace:rendered_action_template(Channel, #{points => Points}),
case greptimedb:write_batch(Client, Points) of
{ok, #{response := {affected_rows, #{value := Rows}}}} ->
?SLOG(debug, #{
@ -465,7 +465,7 @@ do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) ->
connector => InstId,
points => Points
}),
emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => true}),
emqx_trace:rendered_action_template(Channel, #{points => Points}),
WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs).

View File

@ -323,7 +323,7 @@ on_query(
#{headers_config => KafkaHeaders, instance_id => InstId}
),
emqx_trace:rendered_action_template(MessageTag, #{
message => KafkaMessage, send_type => sync
message => KafkaMessage
}),
do_send_msg(sync, KafkaMessage, Producers, SyncTimeout)
catch
@ -383,7 +383,7 @@ on_query_async(
#{headers_config => KafkaHeaders, instance_id => InstId}
),
emqx_trace:rendered_action_template(MessageTag, #{
message => KafkaMessage, send_type => async
message => KafkaMessage
}),
do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn)
catch
@ -506,7 +506,7 @@ on_get_status(
) ->
%% Note: we must avoid returning `?status_disconnected' here if the connector ever was
%% connected. If the connector ever connected, wolff producers might have been
%% sucessfully started, and returning `?status_disconnected' will make resource
%% successfully started, and returning `?status_disconnected' will make resource
%% manager try to restart the producers / connector, thus potentially dropping data
%% held in wolff producer's replayq.
case check_client_connectivity(ClientId) of

View File

@ -119,7 +119,7 @@ on_query(
),
emqx_trace:rendered_action_template(
MessageTag,
#{command => Cmd, batch => false, mode => sync}
#{command => Cmd, batch => false}
),
Result = query(InstId, {cmd, Cmd}, RedisConnSt),
?tp(
@ -143,7 +143,7 @@ on_batch_query(
[{ChannelID, _} | _] = BatchData,
emqx_trace:rendered_action_template(
ChannelID,
#{commands => Cmds, batch => ture, mode => sync}
#{commands => Cmds, batch => ture}
),
Result = query(InstId, {cmds, Cmds}, RedisConnSt),
?tp(

View File

@ -347,7 +347,7 @@ parse_dispatch_strategy(#{strategy := Template}) ->
%% better distribute the load, effectively making it `random'
%% dispatch if the key is absent and we are using `key_dispatch'.
%% Otherwise, it'll be deterministic.
emqx_guid:gen();
emqx_guid:to_base62(emqx_guid:gen());
Key ->
Key
end

View File

@ -12,8 +12,7 @@
]},
{env, [
{emqx_action_info_modules, [
emqx_bridge_s3_upload_action_info,
emqx_bridge_s3_aggreg_upload_action_info
emqx_bridge_s3_upload_action_info
]},
{emqx_connector_info_modules, [
emqx_bridge_s3_connector_info

View File

@ -8,8 +8,6 @@
%% Actions
-define(ACTION_UPLOAD, s3).
-define(BRIDGE_TYPE_UPLOAD, <<"s3">>).
-define(ACTION_AGGREGATED_UPLOAD, s3_aggregated_upload).
-define(BRIDGE_TYPE_AGGREGATED_UPLOAD, <<"s3_aggregated_upload">>).
-define(CONNECTOR, s3).

View File

@ -1,275 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_aggreg_upload).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_bridge_s3.hrl").
-define(ACTION, ?ACTION_AGGREGATED_UPLOAD).
-define(DEFAULT_BATCH_SIZE, 100).
-define(DEFAULT_BATCH_TIME, <<"10ms">>).
-behaviour(hocon_schema).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% Interpreting options
-export([
mk_key_template/1,
mk_upload_options/1
]).
%% emqx_bridge_v2_schema API
-export([bridge_v2_examples/1]).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"bridge_s3_aggreg_upload".
roots() ->
[].
fields(Field) when
Field == "get_bridge_v2";
Field == "put_bridge_v2";
Field == "post_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION));
fields(action) ->
{?ACTION,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
#{
desc => <<"S3 Aggregated Upload Action Config">>,
required => false
}
)};
fields(?ACTION) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
?R_REF(s3_aggregated_upload_parameters),
#{
required => true,
desc => ?DESC(s3_aggregated_upload_parameters)
}
),
#{
resource_opts_ref => ?R_REF(s3_aggreg_upload_resource_opts)
}
);
fields(s3_aggregated_upload_parameters) ->
lists:append([
[
{container,
hoconsc:mk(
%% TODO: Support selectors once there are more than one container.
hoconsc:union(fun
(all_union_members) -> [?REF(s3_aggregated_container_csv)];
({value, _Valur}) -> [?REF(s3_aggregated_container_csv)]
end),
#{
required => true,
default => #{<<"type">> => <<"csv">>},
desc => ?DESC(s3_aggregated_container)
}
)},
{aggregation,
hoconsc:mk(
?REF(s3_aggregation),
#{
required => true,
desc => ?DESC(s3_aggregation)
}
)}
],
emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [
{key, #{desc => ?DESC(s3_aggregated_upload_key)}}
]),
emqx_s3_schema:fields(s3_uploader)
]);
fields(s3_aggregated_container_csv) ->
[
{type,
hoconsc:mk(
csv,
#{
required => true,
desc => ?DESC(s3_aggregated_container_csv)
}
)},
{column_order,
hoconsc:mk(
hoconsc:array(string()),
#{
required => false,
default => [],
desc => ?DESC(s3_aggregated_container_csv_column_order)
}
)}
];
fields(s3_aggregation) ->
[
%% TODO: Needs bucketing? (e.g. messages falling in this 1h interval)
{time_interval,
hoconsc:mk(
emqx_schema:duration_s(),
#{
required => false,
default => <<"1h">>,
desc => ?DESC(s3_aggregation_interval)
}
)},
{max_records,
hoconsc:mk(
pos_integer(),
#{
required => false,
default => <<"1000000">>,
desc => ?DESC(s3_aggregation_max_records)
}
)}
];
fields(s3_aggreg_upload_resource_opts) ->
%% NOTE: This action should benefit from generous batching defaults.
emqx_bridge_v2_schema:action_resource_opts_fields([
{batch_size, #{default => ?DEFAULT_BATCH_SIZE}},
{batch_time, #{default => ?DEFAULT_BATCH_TIME}}
]).
desc(Name) when
Name == s3_aggregated_upload;
Name == s3_aggregated_upload_parameters;
Name == s3_aggregation;
Name == s3_aggregated_container_csv
->
?DESC(Name);
desc(s3_aggreg_upload_resource_opts) ->
?DESC(emqx_resource_schema, resource_opts);
desc(_Name) ->
undefined.
%% Interpreting options
-spec mk_key_template(_Parameters :: map()) -> emqx_template:str().
mk_key_template(#{key := Key}) ->
Template = emqx_template:parse(Key),
{_, BindingErrors} = emqx_template:render(Template, #{}),
{UsedBindings, _} = lists:unzip(BindingErrors),
SuffixTemplate = mk_suffix_template(UsedBindings),
case emqx_template:is_const(SuffixTemplate) of
true ->
Template;
false ->
Template ++ SuffixTemplate
end.
mk_suffix_template(UsedBindings) ->
RequiredBindings = ["action", "node", "datetime.", "sequence"],
SuffixBindings = [
mk_default_binding(RB)
|| RB <- RequiredBindings,
lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings)
],
SuffixTemplate = [["/", B] || B <- SuffixBindings],
emqx_template:parse(SuffixTemplate).
mk_default_binding("datetime.") ->
"${datetime.rfc3339utc}";
mk_default_binding(Binding) ->
"${" ++ Binding ++ "}".
-spec mk_upload_options(_Parameters :: map()) -> emqx_s3_client:upload_options().
mk_upload_options(Parameters) ->
Headers = mk_upload_headers(Parameters),
#{
headers => Headers,
acl => maps:get(acl, Parameters, undefined)
}.
mk_upload_headers(Parameters = #{container := Container}) ->
Headers = normalize_headers(maps:get(headers, Parameters, #{})),
ContainerHeaders = mk_container_headers(Container),
maps:merge(ContainerHeaders, Headers).
normalize_headers(Headers) ->
maps:fold(
fun(Header, Value, Acc) ->
maps:put(string:lowercase(emqx_utils_conv:str(Header)), Value, Acc)
end,
#{},
Headers
).
mk_container_headers(#{type := csv}) ->
#{"content-type" => "text/csv"};
mk_container_headers(#{}) ->
#{}.
%% Examples
bridge_v2_examples(Method) ->
[
#{
<<"s3_aggregated_upload">> => #{
summary => <<"S3 Aggregated Upload">>,
value => s3_action_example(Method)
}
}
].
s3_action_example(post) ->
maps:merge(
s3_action_example(put),
#{
type => atom_to_binary(?ACTION_UPLOAD),
name => <<"my_s3_action">>
}
);
s3_action_example(get) ->
maps:merge(
s3_action_example(put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
s3_action_example(put) ->
#{
enable => true,
connector => <<"my_s3_connector">>,
description => <<"My action">>,
parameters => #{
bucket => <<"mqtt-aggregated">>,
key => <<"${action}/${node}/${datetime.rfc3339utc}_N${sequence}.csv">>,
acl => <<"public_read">>,
aggregation => #{
time_interval => <<"15m">>,
max_records => 100_000
},
<<"container">> => #{
type => <<"csv">>,
column_order => [<<"clientid">>, <<"topic">>, <<"publish_received_at">>]
}
},
resource_opts => #{
health_check_interval => <<"10s">>,
query_mode => <<"async">>,
inflight_window => 100
}
}.

View File

@ -1,21 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_aggreg_upload_action_info).
-behaviour(emqx_action_info).
-include("emqx_bridge_s3.hrl").
-export([
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
action_type_name() -> ?ACTION_AGGREGATED_UPLOAD.
connector_type_name() -> s3.
schema_module() -> emqx_bridge_s3_aggreg_upload.

View File

@ -52,6 +52,7 @@
}.
-type s3_upload_parameters() :: #{
mode := direct,
bucket := string(),
key := string(),
content := string(),
@ -59,6 +60,7 @@
}.
-type s3_aggregated_upload_parameters() :: #{
mode := aggregated,
bucket := string(),
key := string(),
acl => emqx_s3:acl(),
@ -187,22 +189,24 @@ on_get_channel_status(_InstId, ChannelId, State = #{channels := Channels}) ->
start_channel(_State, #{
bridge_type := ?BRIDGE_TYPE_UPLOAD,
parameters := Parameters = #{
mode := Mode = direct,
bucket := Bucket,
key := Key,
content := Content
}
}) ->
#{
type => ?ACTION_UPLOAD,
mode => Mode,
bucket => emqx_template:parse(Bucket),
key => emqx_template:parse(Key),
content => emqx_template:parse(Content),
upload_options => upload_options(Parameters)
};
start_channel(State, #{
bridge_type := Type = ?BRIDGE_TYPE_AGGREGATED_UPLOAD,
bridge_type := Type = ?BRIDGE_TYPE_UPLOAD,
bridge_name := Name,
parameters := Parameters = #{
mode := Mode = aggregated,
aggregation := #{
time_interval := TimeInterval,
max_records := MaxRecords
@ -219,9 +223,9 @@ start_channel(State, #{
},
DeliveryOpts = #{
bucket => Bucket,
key => emqx_bridge_s3_aggreg_upload:mk_key_template(Parameters),
key => emqx_bridge_s3_upload:mk_key_template(Parameters),
container => Container,
upload_options => emqx_bridge_s3_aggreg_upload:mk_upload_options(Parameters),
upload_options => emqx_bridge_s3_upload:mk_upload_options(Parameters),
callback_module => ?MODULE,
client_config => maps:get(client_config, State),
uploader_config => maps:with([min_part_size, max_part_size], Parameters)
@ -235,7 +239,7 @@ start_channel(State, #{
restart => permanent
}),
#{
type => ?ACTION_AGGREGATED_UPLOAD,
mode => Mode,
name => Name,
aggreg_id => AggregId,
bucket => Bucket,
@ -254,14 +258,12 @@ stop_channel(#{on_stop := OnStop}) ->
stop_channel(_ChannelState) ->
ok.
channel_status(#{type := ?ACTION_UPLOAD}, _State) ->
channel_status(#{mode := direct}, _State) ->
%% TODO
%% Since bucket name may be templated, we can't really provide any additional
%% information regarding the channel health.
?status_connected;
channel_status(
#{type := ?ACTION_AGGREGATED_UPLOAD, aggreg_id := AggregId, bucket := Bucket}, State
) ->
channel_status(#{mode := aggregated, aggreg_id := AggregId, bucket := Bucket}, State) ->
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
Timestamp = erlang:system_time(second),
ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
@ -305,9 +307,9 @@ check_aggreg_upload_errors(AggregId) ->
{ok, _Result} | {error, _Reason}.
on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) ->
case maps:get(Tag, Channels, undefined) of
ChannelState = #{type := ?ACTION_UPLOAD} ->
ChannelState = #{mode := direct} ->
run_simple_upload(InstId, Tag, Data, ChannelState, Config);
ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} ->
ChannelState = #{mode := aggregated} ->
run_aggregated_upload(InstId, [Data], ChannelState);
undefined ->
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
@ -317,7 +319,7 @@ on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels})
{ok, _Result} | {error, _Reason}.
on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) ->
case maps:get(Tag, Channels, undefined) of
ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} ->
ChannelState = #{mode := aggregated} ->
Records = [Data0 | [Data || {_, Data} <- Rest]],
run_aggregated_upload(InstId, Records, ChannelState);
undefined ->

View File

@ -18,10 +18,22 @@
desc/1
]).
%% Interpreting options
-export([
mk_key_template/1,
mk_upload_options/1
]).
-export([
bridge_v2_examples/1
]).
%% Internal exports
-export([convert_actions/2]).
-define(DEFAULT_AGGREG_BATCH_SIZE, 100).
-define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
@ -44,25 +56,37 @@ fields(action) ->
hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
#{
desc => <<"S3 Upload Action Config">>,
required => false
required => false,
converter => fun ?MODULE:convert_actions/2
}
)};
fields(?ACTION) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
?R_REF(s3_upload_parameters),
mkunion(mode, #{
<<"direct">> => ?R_REF(s3_direct_upload_parameters),
<<"aggregated">> => ?R_REF(s3_aggregated_upload_parameters)
}),
#{
required => true,
desc => ?DESC(s3_upload)
}
),
#{
resource_opts_ref => ?R_REF(s3_action_resource_opts)
resource_opts_ref => ?R_REF(s3_upload_resource_opts)
}
);
fields(s3_upload_parameters) ->
fields(s3_direct_upload_parameters) ->
emqx_s3_schema:fields(s3_upload) ++
[
{mode,
hoconsc:mk(
direct,
#{
required => true,
desc => ?DESC(s3_direct_upload_mode)
}
)},
{content,
hoconsc:mk(
emqx_schema:template(),
@ -73,49 +97,224 @@ fields(s3_upload_parameters) ->
}
)}
];
fields(s3_action_resource_opts) ->
UnsupportedOpts = [batch_size, batch_time],
lists:filter(
fun({N, _}) -> not lists:member(N, UnsupportedOpts) end,
emqx_bridge_v2_schema:action_resource_opts_fields()
).
fields(s3_aggregated_upload_parameters) ->
lists:append([
[
{mode,
hoconsc:mk(
aggregated,
#{
required => true,
desc => ?DESC(s3_aggregated_upload_mode)
}
)},
{container,
hoconsc:mk(
mkunion(type, #{
<<"csv">> => ?REF(s3_aggregated_container_csv)
}),
#{
required => true,
default => #{<<"type">> => <<"csv">>},
desc => ?DESC(s3_aggregated_container)
}
)},
{aggregation,
hoconsc:mk(
?REF(s3_aggregation),
#{
required => true,
desc => ?DESC(s3_aggregation)
}
)}
],
emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [
{key, #{desc => ?DESC(s3_aggregated_upload_key)}}
]),
emqx_s3_schema:fields(s3_uploader)
]);
fields(s3_aggregated_container_csv) ->
[
{type,
hoconsc:mk(
csv,
#{
required => true,
desc => ?DESC(s3_aggregated_container_csv)
}
)},
{column_order,
hoconsc:mk(
hoconsc:array(string()),
#{
required => false,
default => [],
desc => ?DESC(s3_aggregated_container_csv_column_order)
}
)}
];
fields(s3_aggregation) ->
[
%% TODO: Needs bucketing? (e.g. messages falling in this 1h interval)
{time_interval,
hoconsc:mk(
emqx_schema:duration_s(),
#{
required => false,
default => <<"30m">>,
desc => ?DESC(s3_aggregation_interval)
}
)},
{max_records,
hoconsc:mk(
pos_integer(),
#{
required => false,
default => <<"100000">>,
desc => ?DESC(s3_aggregation_max_records)
}
)}
];
fields(s3_upload_resource_opts) ->
%% NOTE: Aggregated action should benefit from generous batching defaults.
emqx_bridge_v2_schema:action_resource_opts_fields([
{batch_size, #{default => ?DEFAULT_AGGREG_BATCH_SIZE}},
{batch_time, #{default => ?DEFAULT_AGGREG_BATCH_TIME}}
]).
mkunion(Field, Schemas) ->
hoconsc:union(fun(Arg) -> scunion(Field, Schemas, Arg) end).
scunion(_Field, Schemas, all_union_members) ->
maps:values(Schemas);
scunion(Field, Schemas, {value, Value}) ->
Selector = maps:get(emqx_utils_conv:bin(Field), Value, undefined),
case Selector == undefined orelse maps:find(emqx_utils_conv:bin(Selector), Schemas) of
{ok, Schema} ->
[Schema];
_Error ->
throw(#{field_name => Field, expected => maps:keys(Schemas)})
end.
desc(s3) ->
?DESC(s3_upload);
desc(Name) when
Name == s3_upload;
Name == s3_upload_parameters
Name == s3_direct_upload_parameters;
Name == s3_aggregated_upload_parameters;
Name == s3_aggregation;
Name == s3_aggregated_container_csv
->
?DESC(Name);
desc(s3_action_resource_opts) ->
desc(s3_upload_resource_opts) ->
?DESC(emqx_resource_schema, resource_opts);
desc(_Name) ->
undefined.
convert_actions(Conf = #{}, Opts) ->
maps:map(fun(_Name, ConfAction) -> convert_action(ConfAction, Opts) end, Conf);
convert_actions(undefined, _) ->
undefined.
convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := ResourceOpts}, _) ->
case Params of
#{<<"mode">> := <<"direct">>} ->
%% NOTE: Disable batching for direct uploads.
NResourceOpts = ResourceOpts#{<<"batch_size">> => 1, <<"batch_time">> => 0},
Conf#{<<"resource_opts">> := NResourceOpts};
#{} ->
Conf
end.
%% Interpreting options
-spec mk_key_template(_Parameters :: map()) -> emqx_template:str().
mk_key_template(#{key := Key}) ->
Template = emqx_template:parse(Key),
{_, BindingErrors} = emqx_template:render(Template, #{}),
{UsedBindings, _} = lists:unzip(BindingErrors),
SuffixTemplate = mk_suffix_template(UsedBindings),
case emqx_template:is_const(SuffixTemplate) of
true ->
Template;
false ->
Template ++ SuffixTemplate
end.
mk_suffix_template(UsedBindings) ->
RequiredBindings = ["action", "node", "datetime.", "sequence"],
SuffixBindings = [
mk_default_binding(RB)
|| RB <- RequiredBindings,
lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings)
],
SuffixTemplate = [["/", B] || B <- SuffixBindings],
emqx_template:parse(SuffixTemplate).
mk_default_binding("datetime.") ->
"${datetime.rfc3339utc}";
mk_default_binding(Binding) ->
"${" ++ Binding ++ "}".
-spec mk_upload_options(_Parameters :: map()) -> emqx_s3_client:upload_options().
mk_upload_options(Parameters) ->
Headers = mk_upload_headers(Parameters),
#{
headers => Headers,
acl => maps:get(acl, Parameters, undefined)
}.
mk_upload_headers(Parameters = #{container := Container}) ->
Headers = normalize_headers(maps:get(headers, Parameters, #{})),
ContainerHeaders = mk_container_headers(Container),
maps:merge(ContainerHeaders, Headers).
normalize_headers(Headers) ->
maps:fold(
fun(Header, Value, Acc) ->
maps:put(string:lowercase(emqx_utils_conv:str(Header)), Value, Acc)
end,
#{},
Headers
).
mk_container_headers(#{type := csv}) ->
#{"content-type" => "text/csv"};
mk_container_headers(#{}) ->
#{}.
%% Examples
bridge_v2_examples(Method) ->
[
#{
<<"s3">> => #{
summary => <<"S3 Simple Upload">>,
value => s3_upload_action_example(Method)
summary => <<"S3 Direct Upload">>,
value => s3_upload_action_example(Method, direct)
},
<<"s3_aggreg">> => #{
summary => <<"S3 Aggregated Upload">>,
value => s3_upload_action_example(Method, aggreg)
}
}
].
s3_upload_action_example(post) ->
s3_upload_action_example(post, Mode) ->
maps:merge(
s3_upload_action_example(put),
s3_upload_action_example(put, Mode),
#{
type => atom_to_binary(?ACTION_UPLOAD),
name => <<"my_s3_action">>
name => <<"my_s3_action">>,
enable => true,
connector => <<"my_s3_connector">>
}
);
s3_upload_action_example(get) ->
s3_upload_action_example(get, Mode) ->
maps:merge(
s3_upload_action_example(put),
s3_upload_action_example(put, Mode),
#{
enable => true,
connector => <<"my_s3_connector">>,
status => <<"connected">>,
node_status => [
#{
@ -125,12 +324,11 @@ s3_upload_action_example(get) ->
]
}
);
s3_upload_action_example(put) ->
s3_upload_action_example(put, direct) ->
#{
enable => true,
connector => <<"my_s3_connector">>,
description => <<"My action">>,
description => <<"My upload action">>,
parameters => #{
mode => <<"direct">>,
bucket => <<"${clientid}">>,
key => <<"${topic}">>,
content => <<"${payload}">>,
@ -140,4 +338,27 @@ s3_upload_action_example(put) ->
query_mode => <<"sync">>,
inflight_window => 10
}
};
s3_upload_action_example(put, aggreg) ->
#{
description => <<"My aggregated upload action">>,
parameters => #{
mode => <<"aggregated">>,
bucket => <<"mqtt-aggregated">>,
key => <<"${action}/${node}/${datetime.rfc3339utc}_N${sequence}.csv">>,
acl => <<"public_read">>,
aggregation => #{
time_interval => <<"15m">>,
max_records => 100_000
},
<<"container">> => #{
type => <<"csv">>,
column_order => [<<"clientid">>, <<"topic">>, <<"publish_received_at">>]
}
},
resource_opts => #{
health_check_interval => <<"10s">>,
query_mode => <<"async">>,
inflight_window => 100
}
}.

View File

@ -108,6 +108,7 @@ action_config(Name, ConnectorId) ->
<<"enable">> => true,
<<"connector">> => ConnectorId,
<<"parameters">> => #{
<<"mode">> => <<"direct">>,
<<"bucket">> => <<"${clientid}">>,
<<"key">> => <<"${topic}">>,
<<"content">> => <<"${payload}">>,
@ -122,6 +123,8 @@ action_config(Name, ConnectorId) ->
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"60s">>,
<<"batch_size">> => 42,
<<"batch_time">> => <<"100ms">>,
<<"resume_interval">> => <<"3s">>,
<<"worker_pool_size">> => <<"4">>
}
@ -131,6 +134,13 @@ action_config(Name, ConnectorId) ->
t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
t_ignore_batch_opts(Config) ->
{ok, {_Status, _, Bridge}} = emqx_bridge_v2_testlib:create_bridge_api(Config),
?assertMatch(
#{<<"resource_opts">> := #{<<"batch_size">> := 1, <<"batch_time">> := 0}},
Bridge
).
t_start_broken_update_restart(Config) ->
Name = ?config(connector_name, Config),
Type = ?config(connector_type, Config),

View File

@ -14,7 +14,7 @@
-import(emqx_utils_conv, [bin/1]).
%% See `emqx_bridge_s3.hrl`.
-define(BRIDGE_TYPE, <<"s3_aggregated_upload">>).
-define(BRIDGE_TYPE, <<"s3">>).
-define(CONNECTOR_TYPE, <<"s3">>).
-define(PROXY_NAME, "minio_tcp").
@ -122,6 +122,7 @@ action_config(Name, ConnectorId, Bucket) ->
<<"enable">> => true,
<<"connector">> => ConnectorId,
<<"parameters">> => #{
<<"mode">> => <<"aggregated">>,
<<"bucket">> => unicode:characters_to_binary(Bucket),
<<"key">> => <<"${action}/${node}/${datetime.rfc3339}">>,
<<"acl">> => <<"public_read">>,

View File

@ -36,6 +36,7 @@
-define(CONF, conf).
-define(AUDIT_MOD, audit).
-define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>).
-define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>).
-dialyzer({no_match, [load/0]}).
@ -208,7 +209,7 @@ hidden_roots() ->
<<"stats">>,
<<"broker">>,
<<"persistent_session_store">>,
<<"session_persistence">>,
<<"durable_sessions">>,
<<"plugins">>,
<<"zones">>
].
@ -330,6 +331,10 @@ update_config_cluster(
#{mode := merge} = Opts
) ->
check_res(Key, emqx_authn:merge_config(Conf), Conf, Opts);
update_config_cluster(?SCHEMA_VALIDATION_CONF_ROOT_BIN = Key, NewConf, #{mode := merge} = Opts) ->
check_res(Key, emqx_conf:update([Key], {merge, NewConf}, ?OPTIONS), NewConf, Opts);
update_config_cluster(?SCHEMA_VALIDATION_CONF_ROOT_BIN = Key, NewConf, #{mode := replace} = Opts) ->
check_res(Key, emqx_conf:update([Key], {replace, NewConf}, ?OPTIONS), NewConf, Opts);
update_config_cluster(Key, NewConf, #{mode := merge} = Opts) ->
Merged = merge_conf(Key, NewConf),
check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Opts);

View File

@ -72,6 +72,7 @@
]).
-define(GAUGE_SAMPLER_LIST, [
durable_subscriptions,
subscriptions,
topics,
connections,

View File

@ -262,6 +262,8 @@ merge_cluster_rate(Node, Cluster) ->
Fun =
fun
%% cluster-synced values
(durable_subscriptions, V, NCluster) ->
NCluster#{durable_subscriptions => V};
(topics, V, NCluster) ->
NCluster#{topics => V};
(retained_msg_count, V, NCluster) ->
@ -416,6 +418,7 @@ getstats(Key) ->
end.
stats(connections) -> emqx_stats:getstat('connections.count');
stats(durable_subscriptions) -> emqx_stats:getstat('durable_subscriptions.count');
stats(live_connections) -> emqx_stats:getstat('live_connections.count');
stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count');
stats(topics) -> emqx_stats:getstat('topics.count');

View File

@ -194,8 +194,11 @@ swagger_desc(validation_failed) ->
swagger_desc_format("Schema validations failed ");
swagger_desc(persisted) ->
swagger_desc_format("Messages saved to the durable storage ");
swagger_desc(durable_subscriptions) ->
<<"Subscriptions from durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(subscriptions) ->
<<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>;
<<"Subscriptions at the time of sampling (not considering durable sessions).",
?APPROXIMATE_DESC>>;
swagger_desc(topics) ->
<<"Count topics at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(connections) ->

View File

@ -25,6 +25,7 @@
request/3,
request/4,
request/5,
request/6,
multipart_formdata_request/3,
multipart_formdata_request/4,
host/0,
@ -77,8 +78,11 @@ request(Username, Method, Url, Body) ->
request(Username, <<"public">>, Method, Url, Body).
request(Username, Password, Method, Url, Body) ->
request(Username, Password, Method, Url, Body, #{}).
request(Username, Password, Method, Url, Body0, Headers) ->
Request =
case Body of
case Body0 of
[] when
Method =:= get orelse Method =:= put orelse
Method =:= head orelse Method =:= delete orelse
@ -86,8 +90,10 @@ request(Username, Password, Method, Url, Body) ->
->
{Url, [auth_header(Username, Password)]};
_ ->
{Url, [auth_header(Username, Password)], "application/json",
emqx_utils_json:encode(Body)}
ContentType = maps:get("content-type", Headers, "application/json"),
HeadersList = maps:to_list(maps:without(["content-type"], Headers)),
Body = maybe_encode(Body0),
{Url, [auth_header(Username, Password) | HeadersList], ContentType, Body}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of
@ -99,6 +105,9 @@ request(Username, Password, Method, Url, Body) ->
{error, Reason}
end.
maybe_encode(Body) when is_binary(Body) -> Body;
maybe_encode(Body) -> emqx_utils_json:encode(Body).
host() ->
?HOST.

View File

@ -82,7 +82,7 @@ init_per_group(persistent_sessions = Group, Config) ->
Apps = emqx_cth_suite:start(
[
emqx_conf,
{emqx, "session_persistence {enable = true}"},
{emqx, "durable_sessions {enable = true}"},
{emqx_retainer, ?BASE_RETAINER_CONF},
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard(
@ -345,7 +345,8 @@ t_persistent_session_stats(_Config) ->
%% and non-persistent routes, so we count `commont/topic' twice and get 8
%% instead of 6 here.
<<"topics">> := 8,
<<"subscriptions">> := 8
<<"durable_subscriptions">> := 4,
<<"subscriptions">> := 4
}},
request(["monitor_current"])
)

View File

@ -186,7 +186,7 @@ prometheus_per_db(NodeOrAggr) ->
%% This function returns the data in the following format:
%% ```
%% #{emqx_ds_store_batch_time =>
%% [{[{db, emqx_persistent_message}], 42}],
%% [{[{db, messages}], 42}],
%% ...
%% '''
%%
@ -222,11 +222,11 @@ prometheus_per_db(NodeOrAggr, DB, Acc0) ->
%% This function returns the data in the following format:
%% ```
%% #{emqx_ds_egress_batches =>
%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],99408},
%% {[{db,emqx_persistent_message},{shard,<<"0">>}],99409}],
%% [{[{db,messages},{shard,<<"1">>}],99408},
%% {[{db,messages},{shard,<<"0">>}],99409}],
%% emqx_ds_egress_batches_retry =>
%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],0},
%% {[{db,emqx_persistent_message},{shard,<<"0">>}],0}],
%% [{[{db,messages},{shard,<<"1">>}],0},
%% {[{db,messages},{shard,<<"0">>}],0}],
%% emqx_ds_egress_messages =>
%% ...
%% }

View File

@ -35,6 +35,7 @@
sites/0,
node/1,
this_site/0,
forget_site/1,
print_status/0
]).
@ -75,7 +76,8 @@
update_replica_set_trans/3,
update_db_config_trans/2,
drop_db_trans/1,
claim_site/2,
claim_site_trans/2,
forget_site_trans/1,
n_shards/1
]).
@ -131,7 +133,7 @@
-type transition() :: {add | del, site()}.
-type update_cluster_result() ::
ok
{ok, unchanged | [site()]}
| {error, {nonexistent_db, emqx_ds:db()}}
| {error, {nonexistent_sites, [site()]}}
| {error, {too_few_sites, [site()]}}
@ -153,6 +155,11 @@
erlang:make_tuple(record_info(size, ?NODE_TAB), '_')
).
-define(NODE_PAT(NODE),
%% Equivalent of `#?NODE_TAB{node = NODE, _ = '_'}`:
erlang:make_tuple(record_info(size, ?NODE_TAB), '_', [{#?NODE_TAB.node, NODE}])
).
-define(SHARD_PAT(SHARD),
%% Equivalent of `#?SHARD_TAB{shard = SHARD, _ = '_'}`
erlang:make_tuple(record_info(size, ?SHARD_TAB), '_', [{#?SHARD_TAB.shard, SHARD}])
@ -164,32 +171,89 @@
-spec print_status() -> ok.
print_status() ->
io:format("THIS SITE:~n~s~n", [this_site()]),
io:format("THIS SITE:~n"),
try this_site() of
Site -> io:format("~s~n", [Site])
catch
error:badarg ->
io:format(
"(!) UNCLAIMED~n"
"(!) Likely this node's name is already known as another site in the cluster.~n"
"(!) Please resolve conflicts manually.~n"
)
end,
io:format("~nSITES:~n", []),
Nodes = [node() | nodes()],
lists:foreach(
fun(#?NODE_TAB{site = Site, node = Node}) ->
Status =
case lists:member(Node, Nodes) of
true -> up;
false -> down
case mria:cluster_status(Node) of
running -> " up";
stopped -> "(x) down";
false -> "(!) UNIDENTIFIED"
end,
io:format("~s ~p ~p~n", [Site, Node, Status])
io:format("~s ~p ~s~n", [Site, Node, Status])
end,
eval_qlc(mnesia:table(?NODE_TAB))
),
Shards = eval_qlc(mnesia:table(?SHARD_TAB)),
io:format(
"~nSHARDS:~nId Replicas~n", []
"~nSHARDS:~n~s~s~n",
[string:pad("Shard", 30), "Replicas"]
),
lists:foreach(
fun(#?SHARD_TAB{shard = {DB, Shard}, replica_set = RS}) ->
ShardStr = string:pad(io_lib:format("~p/~s", [DB, Shard]), 30),
ReplicasStr = string:pad(io_lib:format("~p", [RS]), 40),
io:format("~s ~s~n", [ShardStr, ReplicasStr])
fun(#?SHARD_TAB{shard = DBShard, replica_set = RS}) ->
ShardStr = format_shard(DBShard),
ReplicasStr = string:join([format_replica(R) || R <- RS], " "),
io:format(
"~s~s~n",
[string:pad(ShardStr, 30), ReplicasStr]
)
end,
eval_qlc(mnesia:table(?SHARD_TAB))
Shards
),
PendingTransitions = lists:filtermap(
fun(Record = #?SHARD_TAB{shard = DBShard}) ->
case compute_transitions(Record) of
[] -> false;
Transitions -> {true, {DBShard, Transitions}}
end
end,
Shards
),
PendingTransitions /= [] andalso
io:format(
"~nREPLICA TRANSITIONS:~n~s~s~n",
[string:pad("Shard", 30), "Transitions"]
),
lists:foreach(
fun({DBShard, Transitions}) ->
ShardStr = format_shard(DBShard),
TransStr = string:join(lists:map(fun format_transition/1, Transitions), " "),
io:format(
"~s~s~n",
[string:pad(ShardStr, 30), TransStr]
)
end,
PendingTransitions
).
format_shard({DB, Shard}) ->
io_lib:format("~p/~s", [DB, Shard]).
format_replica(Site) ->
Marker =
case mria:cluster_status(?MODULE:node(Site)) of
running -> " ";
stopped -> "(x)";
false -> "(!)"
end,
io_lib:format("~s ~s", [Marker, Site]).
format_transition({add, Site}) ->
io_lib:format("+~s", [Site]);
format_transition({del, Site}) ->
io_lib:format("-~s", [Site]).
-spec this_site() -> site().
this_site() ->
persistent_term:get(?emqx_ds_builtin_site).
@ -256,6 +320,15 @@ node(Site) ->
undefined
end.
-spec forget_site(site()) -> ok | {error, _}.
forget_site(Site) ->
case mnesia:dirty_read(?NODE_TAB, Site) of
[] ->
{error, nonexistent_site};
[Record] ->
transaction(fun ?MODULE:forget_site_trans/1, [Record])
end.
%%===============================================================================
%% DB API
%%===============================================================================
@ -314,8 +387,8 @@ db_sites(DB) ->
[transition()] | undefined.
replica_set_transitions(DB, Shard) ->
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
[#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}] ->
compute_transitions(TargetSet, ReplicaSet);
[Record] ->
compute_transitions(Record);
[] ->
undefined
end.
@ -374,6 +447,7 @@ unsubscribe(Pid) ->
init([]) ->
process_flag(trap_exit, true),
logger:set_process_metadata(#{domain => [ds, meta]}),
ok = ekka:monitor(membership),
ensure_tables(),
ensure_site(),
S = #s{},
@ -395,6 +469,9 @@ handle_info({mnesia_table_event, {write, #?SHARD_TAB{shard = {DB, Shard}}, _}},
{noreply, S};
handle_info({'DOWN', _MRef, process, Pid, _Reason}, S) ->
{noreply, handle_unsubscribe(Pid, S)};
handle_info({membership, {node, leaving, Node}}, S) ->
forget_node(Node),
{noreply, S};
handle_info(_Info, S) ->
{noreply, S}.
@ -420,13 +497,6 @@ open_db_trans(DB, CreateOpts) ->
-spec allocate_shards_trans(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
allocate_shards_trans(DB) ->
Opts = #{n_shards := NShards, n_sites := NSites} = db_config_trans(DB),
Nodes = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read),
case length(Nodes) of
N when N >= NSites ->
ok;
_ ->
mnesia:abort({insufficient_sites_online, NSites, Nodes})
end,
case mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write) of
[] ->
ok;
@ -434,6 +504,13 @@ allocate_shards_trans(DB) ->
ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records],
mnesia:abort({shards_already_allocated, ShardsAllocated})
end,
Nodes = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read),
case length(Nodes) of
N when N >= NSites ->
ok;
_ ->
mnesia:abort({insufficient_sites_online, NSites, Nodes})
end,
Shards = gen_shards(NShards),
Sites = [S || #?NODE_TAB{site = S} <- Nodes],
Allocation = compute_allocation(Shards, Sites, Opts),
@ -449,7 +526,7 @@ allocate_shards_trans(DB) ->
Allocation
).
-spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> ok.
-spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> {ok, [site()]}.
assign_db_sites_trans(DB, Sites) ->
Opts = db_config_trans(DB),
case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of
@ -464,23 +541,24 @@ assign_db_sites_trans(DB, Sites) ->
%% Optimize reallocation. The goals are:
%% 1. Minimize the number of membership transitions.
%% 2. Ensure that sites are responsible for roughly the same number of shards.
Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
Shards = db_shards_trans(DB),
Reallocation = compute_allocation(Shards, Sites, Opts),
lists:foreach(
ok = lists:foreach(
fun({Record, ReplicaSet}) ->
ok = mnesia:write(Record#?SHARD_TAB{target_set = ReplicaSet})
end,
Reallocation
).
),
{ok, Sites}.
-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> ok.
-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> {ok, unchanged | [site()]}.
modify_db_sites_trans(DB, Modifications) ->
Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
Shards = db_shards_trans(DB),
Sites0 = list_db_target_sites(Shards),
Sites = lists:foldl(fun apply_transition/2, Sites0, Modifications),
case Sites of
Sites0 ->
ok;
{ok, unchanged};
_Changed ->
assign_db_sites_trans(DB, Sites)
end.
@ -531,15 +609,40 @@ db_config_trans(DB, LockType) ->
mnesia:abort({nonexistent_db, DB})
end.
db_shards_trans(DB) ->
mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write).
-spec drop_db_trans(emqx_ds:db()) -> ok.
drop_db_trans(DB) ->
mnesia:delete({?META_TAB, DB}),
[mnesia:delete({?SHARD_TAB, Shard}) || Shard <- shards(DB)],
ok.
-spec claim_site(site(), node()) -> ok.
claim_site(Site, Node) ->
mnesia:write(#?NODE_TAB{site = Site, node = Node}).
-spec claim_site_trans(site(), node()) -> ok.
claim_site_trans(Site, Node) ->
case node_sites(Node) of
[] ->
mnesia:write(#?NODE_TAB{site = Site, node = Node});
[#?NODE_TAB{site = Site}] ->
ok;
Records ->
ExistingSites = [S || #?NODE_TAB{site = S} <- Records],
mnesia:abort({conflicting_node_site, ExistingSites})
end.
-spec forget_site_trans(_Record :: tuple()) -> ok.
forget_site_trans(Record = #?NODE_TAB{site = Site}) ->
DBs = mnesia:all_keys(?META_TAB),
SiteDBs = [DB || DB <- DBs, S <- list_db_target_sites(db_shards_trans(DB)), S == Site],
case SiteDBs of
[] ->
mnesia:delete_object(?NODE_TAB, Record, write);
[_ | _] ->
mnesia:abort({member_of_replica_sets, SiteDBs})
end.
node_sites(Node) ->
mnesia:dirty_match_object(?NODE_TAB, ?NODE_PAT(Node)).
%%================================================================================
%% Internal functions
@ -582,9 +685,22 @@ ensure_site() ->
io:format(FD, "~p.", [Site]),
file:close(FD)
end,
{atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]),
persistent_term:put(?emqx_ds_builtin_site, Site),
ok.
case transaction(fun ?MODULE:claim_site_trans/2, [Site, node()]) of
ok ->
persistent_term:put(?emqx_ds_builtin_site, Site);
{error, Reason} ->
logger:error("Attempt to claim site with ID=~s failed: ~p", [Site, Reason])
end.
forget_node(Node) ->
Sites = node_sites(Node),
Results = transaction(fun lists:map/2, [fun ?MODULE:forget_site_trans/1, Sites]),
case [Reason || {error, Reason} <- Results] of
[] ->
ok;
Errors ->
logger:error("Failed to forget leaving node ~p: ~p", [Node, Errors])
end.
%% @doc Returns sorted list of sites shards are replicated across.
-spec list_db_sites([_Shard]) -> [site()].
@ -624,6 +740,9 @@ compute_allocation(Shards, Sites, Opts) ->
),
Allocation.
compute_transitions(#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}) ->
compute_transitions(TargetSet, ReplicaSet).
compute_transitions(undefined, _ReplicaSet) ->
[];
compute_transitions(TargetSet, ReplicaSet) ->

View File

@ -191,7 +191,7 @@ handle_shard_transitions(Shard, [Trans | _Rest], State) ->
end.
transition_handler(Shard, Trans, _State = #{db := DB}) ->
ThisSite = emqx_ds_replication_layer_meta:this_site(),
ThisSite = catch emqx_ds_replication_layer_meta:this_site(),
case Trans of
{add, ThisSite} ->
{Shard, fun trans_add_local/3};

View File

@ -239,7 +239,7 @@ t_rebalance(Config) ->
),
%% Scale down the cluster by removing the first node.
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
?assertMatch({ok, _}, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
ct:pal("Transitions (~p -> ~p): ~p~n", [
Sites, tl(Sites), emqx_ds_test_helpers:transitions(N1, ?DB)
]),
@ -297,12 +297,12 @@ t_join_leave_errors(Config) ->
),
%% NOTE: Leaving a non-existent site is not an error.
?assertEqual(
ok,
{ok, unchanged},
ds_repl_meta(N1, leave_db_site, [?DB, <<"NO-MANS-SITE">>])
),
%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])),
?assertEqual({ok, unchanged}, ds_repl_meta(N1, join_db_site, [?DB, S1])),
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)),
%% Impossible to leave the last site.
@ -312,13 +312,13 @@ t_join_leave_errors(Config) ->
),
%% "Move" the DB to the other node.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertMatch({ok, _}, ds_repl_meta(N1, join_db_site, [?DB, S2])),
?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?DB)),
?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)).
t_rebalance_chaotic_converges(init, Config) ->
@ -457,7 +457,7 @@ t_rebalance_offline_restarts(Config) ->
%% Shut down N3 and then remove it from the DB.
ok = emqx_cth_cluster:stop_node(N3),
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
?assertMatch({ok, _}, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
Transitions = emqx_ds_test_helpers:transitions(N1, ?DB),
ct:pal("Transitions: ~p~n", [Transitions]),

View File

@ -164,8 +164,8 @@ apply_stream(DB, NodeStream0, Stream0, N) ->
->
?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}),
%% Apply the transition.
?assertEqual(
ok,
?assertMatch(
{ok, _},
?ON(
Node,
emqx_ds_replication_layer_meta:Operation(DB, Arg)

View File

@ -119,7 +119,11 @@ handle_call({update, License}, _From, #{license := Old} = State) ->
ok = log_new_license(Old, License),
{reply, check_license(License), State1#{license => License}};
handle_call(dump, _From, #{license := License} = State) ->
{reply, emqx_license_parser:dump(License), State};
Dump0 = emqx_license_parser:dump(License),
%% resolve the current dynamic limit
MaybeDynamic = get_max_connections(License),
Dump = lists:keyreplace(max_connections, 1, Dump0, {max_connections, MaybeDynamic}),
{reply, Dump, State};
handle_call(expiry_epoch, _From, #{license := License} = State) ->
ExpiryEpoch = date_to_expiry_epoch(emqx_license_parser:expiry_date(License)),
{reply, ExpiryEpoch, State};

View File

@ -149,7 +149,7 @@ error_msg(Code, Msg) ->
'/license/setting'(get, _Params) ->
{200, get_setting()};
'/license/setting'(put, #{body := Setting}) ->
case emqx_license:update_setting(Setting) of
case update_setting(Setting) of
{error, Error} ->
?SLOG(
error,
@ -165,6 +165,12 @@ error_msg(Code, Msg) ->
'/license/setting'(get, undefined)
end.
update_setting(Setting) when is_map(Setting) ->
emqx_license:update_setting(Setting);
update_setting(_Setting) ->
%% TODO: EMQX-12401 content-type enforcement by framework
{error, "bad content-type"}.
fields(key_license) ->
[lists:keyfind(key, 1, emqx_license_schema:fields(key_license))].

View File

@ -57,7 +57,12 @@ end_per_testcase(_TestCase, _Config) ->
%%------------------------------------------------------------------------------
request(Method, Uri, Body) ->
emqx_dashboard_api_test_helpers:request(<<"license_admin">>, Method, Uri, Body).
request(Method, Uri, Body, #{}).
request(Method, Uri, Body, Headers) ->
emqx_dashboard_api_test_helpers:request(
<<"license_admin">>, <<"public">>, Method, Uri, Body, Headers
).
uri(Segments) ->
emqx_dashboard_api_test_helpers:uri(Segments).
@ -229,24 +234,44 @@ t_license_setting(_Config) ->
t_license_setting_bc(_Config) ->
%% Create a BC license
Key = emqx_license_test_lib:make_license(#{customer_type => "3"}),
Key = emqx_license_test_lib:make_license(#{
customer_type => "3",
max_connections => "33"
}),
Res = request(post, uri(["license"]), #{key => Key}),
?assertMatch({ok, 200, _}, Res),
%% for bc customer, before setting dynamic limit,
%% the default limit is always 25, as if no license
?assertMatch(#{<<"max_connections">> := 25}, request_dump()),
%% get
GetRes = request(get, uri(["license", "setting"]), []),
%% aslo check that the settings return correctly
validate_setting(GetRes, <<"75%">>, <<"80%">>, 25),
%% update
Low = <<"50%">>,
High = <<"55%">>,
UpdateRes = request(put, uri(["license", "setting"]), #{
Settings = #{
<<"connection_low_watermark">> => Low,
<<"connection_high_watermark">> => High,
<<"dynamic_max_connections">> => 26
}),
},
UpdateRes = request(put, uri(["license", "setting"]), Settings),
%% assert it's changed to 26
validate_setting(UpdateRes, Low, High, 26),
?assertMatch(#{<<"max_connections">> := 26}, request_dump()),
?assertEqual(26, emqx_config:get([license, dynamic_max_connections])),
%% Try to set it beyond the limit, it's allowed, but no effect
Settings2 = Settings#{<<"dynamic_max_connections">> => 99999},
UpdateRes2 = request(put, uri(["license", "setting"]), Settings2),
validate_setting(UpdateRes2, Low, High, 99999),
?assertMatch(#{<<"max_connections">> := 33}, request_dump()),
?assertEqual(99999, emqx_config:get([license, dynamic_max_connections])),
ok.
request_dump() ->
{ok, 200, DumpJson} = request(get, uri(["license"]), []),
emqx_utils_json:decode(DumpJson).
validate_setting(Res, ExpectLow, ExpectHigh) ->
?assertMatch({ok, 200, _}, Res),
{ok, 200, Payload} = Res,

View File

@ -270,6 +270,12 @@ get_metrics() ->
get_metrics(Node) ->
unwrap_rpc(emqx_proto_v1:get_metrics(Node)).
aggregated_only_keys() ->
[
'durable_subscriptions.count',
'durable_subscriptions.max'
].
get_stats() ->
GlobalStatsKeys =
[
@ -294,7 +300,7 @@ get_stats() ->
emqx:running_nodes()
)
),
GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))),
GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(emqx_stats:getstats())),
maps:merge(CountStats, GlobalStats).
delete_keys(List, []) ->
@ -303,7 +309,12 @@ delete_keys(List, [Key | Keys]) ->
delete_keys(proplists:delete(Key, List), Keys).
get_stats(Node) ->
unwrap_rpc(emqx_proto_v1:get_stats(Node)).
case unwrap_rpc(emqx_proto_v1:get_stats(Node)) of
{error, _} = Error ->
Error;
Stats when is_list(Stats) ->
delete_keys(Stats, aggregated_only_keys())
end.
nodes_info_count(PropList) ->
NodeCount =

View File

@ -136,6 +136,7 @@ schema("/clients_v2") ->
'operationId' => list_clients_v2,
get => #{
description => ?DESC(list_clients),
hidden => true,
tags => ?TAGS,
parameters => fields(list_clients_v2_inputs),
responses => #{
@ -1529,13 +1530,13 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
check_for_live_and_expired(Rows) ->
lists:filtermap(
fun({ClientId, Session}) ->
fun({ClientId, _Session}) ->
case is_live_session(ClientId) of
true ->
false;
false ->
DSSession = emqx_persistent_session_ds_state:print_session(ClientId),
{true, {ClientId, DSSession#{is_expired => is_expired(Session)}}}
{true, {ClientId, DSSession}}
end
end,
Rows
@ -1755,18 +1756,32 @@ format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) ->
format_persistent_session_info(ClientId, PSInfo0).
format_persistent_session_info(
_ClientId, #{metadata := #{offline_info := #{chan_info := ChanInfo, stats := Stats}}} = PSInfo
_ClientId,
#{
metadata := #{offline_info := #{chan_info := ChanInfo, stats := Stats} = OfflineInfo} =
Metadata
} =
PSInfo
) ->
Info0 = format_channel_info(_Node = undefined, {_Key = undefined, ChanInfo, Stats}, #{
fields => all
}),
Info0#{
connected => false,
durable => true,
is_persistent => true,
is_expired => maps:get(is_expired, PSInfo, false),
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{}))
};
LastConnectedToNode = maps:get(last_connected_to, OfflineInfo, undefined),
DisconnectedAt = maps:get(disconnected_at, OfflineInfo, undefined),
%% `created_at' and `connected_at' have already been formatted by this point.
Info = result_format_time_fun(
disconnected_at,
Info0#{
connected => false,
disconnected_at => DisconnectedAt,
durable => true,
is_persistent => true,
is_expired => is_expired(Metadata),
node => LastConnectedToNode,
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{}))
}
),
result_format_undefined_to_null(Info);
format_persistent_session_info(ClientId, PSInfo0) ->
Metadata = maps:get(metadata, PSInfo0, #{}),
{ProtoName, ProtoVer} = maps:get(protocol, Metadata),
@ -1786,7 +1801,7 @@ format_persistent_session_info(ClientId, PSInfo0) ->
connected_at => CreatedAt,
durable => true,
ip_address => IpAddress,
is_expired => maps:get(is_expired, PSInfo0, false),
is_expired => is_expired(Metadata),
is_persistent => true,
port => Port,
heap_size => 0,

View File

@ -21,6 +21,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
-include_lib("emqx/include/emqx_persistent_message.hrl").
-import(hoconsc, [mk/2, ref/1, enum/1, array/1]).
@ -35,7 +36,9 @@
update_db_sites/3,
join/3,
leave/3
leave/3,
forget/2
]).
%% behavior callbacks:
@ -222,7 +225,7 @@ fields(sites_shard) ->
atom(),
#{
desc => <<"Durable storage ID">>,
example => 'emqx_persistent_message'
example => ?PERSISTENT_MESSAGE_DB
}
)},
{id,
@ -249,7 +252,7 @@ fields(db) ->
atom(),
#{
desc => <<"Name of the durable storage">>,
example => 'emqx_persistent_message'
example => ?PERSISTENT_MESSAGE_DB
}
)},
{shards,
@ -323,17 +326,11 @@ get_db(get, #{bindings := #{ds := DB}}) ->
}).
db_replicas(get, #{bindings := #{ds := DB}}) ->
Replicas = lists:flatmap(
fun(Shard) ->
#{replica_set := RS} = emqx_ds_replication_layer_meta:shard_info(DB, Shard),
maps:keys(RS)
end,
emqx_ds_replication_layer_meta:shards(DB)
),
?OK(lists:usort(Replicas));
Replicas = emqx_ds_replication_layer_meta:db_sites(DB),
?OK(Replicas);
db_replicas(put, #{bindings := #{ds := DB}, body := Sites}) ->
case update_db_sites(DB, Sites, rest) of
ok ->
{ok, _} ->
{202, <<"OK">>};
{error, Description} ->
?BAD_REQUEST(400, Description)
@ -341,21 +338,23 @@ db_replicas(put, #{bindings := #{ds := DB}, body := Sites}) ->
db_replica(put, #{bindings := #{ds := DB, site := Site}}) ->
case join(DB, Site, rest) of
ok ->
{ok, _} ->
{202, <<"OK">>};
{error, Description} ->
?BAD_REQUEST(400, Description)
end;
db_replica(delete, #{bindings := #{ds := DB, site := Site}}) ->
case leave(DB, Site, rest) of
ok ->
{ok, Sites} when is_list(Sites) ->
{202, <<"OK">>};
{ok, unchanged} ->
?NOT_FOUND(<<"Site is not part of replica set">>);
{error, Description} ->
?BAD_REQUEST(400, Description)
end.
-spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) ->
ok | {error, binary()}.
{ok, [emqx_ds_replication_layer_meta:site()]} | {error, _}.
update_db_sites(DB, Sites, Via) when is_list(Sites) ->
?SLOG(warning, #{
msg => "durable_storage_rebalance_request", ds => DB, sites => Sites, via => Via
@ -364,20 +363,30 @@ update_db_sites(DB, Sites, Via) when is_list(Sites) ->
update_db_sites(_, _, _) ->
{error, <<"Bad type">>}.
-spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}.
-spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) ->
{ok, unchanged | [emqx_ds_replication_layer_meta:site()]} | {error, _}.
join(DB, Site, Via) ->
?SLOG(warning, #{
msg => "durable_storage_join_request", ds => DB, site => Site, via => Via
}),
meta_result_to_binary(emqx_ds_replication_layer_meta:join_db_site(DB, Site)).
-spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}.
-spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) ->
{ok, unchanged | [emqx_ds_replication_layer_meta:site()]} | {error, _}.
leave(DB, Site, Via) ->
?SLOG(warning, #{
msg => "durable_storage_leave_request", ds => DB, site => Site, via => Via
}),
meta_result_to_binary(emqx_ds_replication_layer_meta:leave_db_site(DB, Site)).
-spec forget(emqx_ds_replication_layer_meta:site(), rest | cli) ->
ok | {error, _}.
forget(Site, Via) ->
?SLOG(warning, #{
msg => "durable_storage_forget_request", site => Site, via => Via
}),
meta_result_to_binary(emqx_ds_replication_layer_meta:forget_site(Site)).
%%================================================================================
%% Internal functions
%%================================================================================
@ -405,7 +414,7 @@ param_storage_id() ->
required => true,
in => path,
desc => <<"Durable storage ID">>,
example => emqx_persistent_message
example => ?PERSISTENT_MESSAGE_DB
},
{ds, mk(enum(dbs()), Info)}.
@ -418,7 +427,7 @@ example_site() ->
end.
dbs() ->
[emqx_persistent_message].
[?PERSISTENT_MESSAGE_DB].
shards_of_site(Site) ->
lists:flatmap(
@ -468,14 +477,20 @@ list_shards(DB) ->
|| Shard <- emqx_ds_replication_layer_meta:shards(DB)
].
meta_result_to_binary(ok) ->
ok;
meta_result_to_binary(Ok) when Ok == ok orelse element(1, Ok) == ok ->
Ok;
meta_result_to_binary({error, {nonexistent_sites, UnknownSites}}) ->
Msg = ["Unknown sites: " | lists:join(", ", UnknownSites)],
{error, iolist_to_binary(Msg)};
meta_result_to_binary({error, {nonexistent_db, DB}}) ->
IOList = io_lib:format("Unknown storage: ~p", [DB]),
{error, iolist_to_binary(IOList)};
meta_result_to_binary({error, nonexistent_site}) ->
{error, <<"Unknown site">>};
meta_result_to_binary({error, {member_of_replica_sets, DBNames}}) ->
DBs = lists:map(fun atom_to_binary/1, DBNames),
Msg = ["Site is still a member of replica sets of: " | lists:join(", ", DBs)],
{error, iolist_to_binary(Msg)};
meta_result_to_binary({error, Err}) ->
IOList = io_lib:format("Error: ~p", [Err]),
{error, iolist_to_binary(IOList)}.

View File

@ -122,7 +122,7 @@ schema("/nodes/:node/stats") ->
responses =>
#{
200 => mk(
ref(?NODE_STATS_MODULE, node_stats_data),
ref(?NODE_STATS_MODULE, aggregated_data),
#{desc => <<"Get node stats successfully">>}
),
404 => not_found()

View File

@ -60,8 +60,8 @@ schema("/stats") ->
#{
200 => mk(
hoconsc:union([
ref(?MODULE, node_stats_data),
array(ref(?MODULE, aggergate_data))
array(ref(?MODULE, per_node_data)),
ref(?MODULE, aggregated_data)
]),
#{desc => <<"List stats ok">>}
)
@ -82,7 +82,7 @@ fields(aggregate) ->
}
)}
];
fields(node_stats_data) ->
fields(aggregated_data) ->
[
stats_schema('channels.count', <<"sessions.count">>),
stats_schema('channels.max', <<"session.max">>),
@ -106,7 +106,10 @@ fields(node_stats_data) ->
stats_schema('subscribers.max', <<"Historical maximum number of subscribers">>),
stats_schema(
'subscriptions.count',
<<"Number of current subscriptions, including shared subscriptions">>
<<
"Number of current subscriptions, including shared subscriptions,"
" but not subscriptions from durable sessions"
>>
),
stats_schema('subscriptions.max', <<"Historical maximum number of subscriptions">>),
stats_schema('subscriptions.shared.count', <<"Number of current shared subscriptions">>),
@ -116,14 +119,18 @@ fields(node_stats_data) ->
stats_schema('topics.count', <<"Number of current topics">>),
stats_schema('topics.max', <<"Historical maximum number of topics">>)
];
fields(aggergate_data) ->
fields(per_node_data) ->
[
{node,
mk(string(), #{
desc => <<"Node name">>,
example => <<"emqx@127.0.0.1">>
})}
] ++ fields(node_stats_data).
})},
stats_schema(
'durable_subscriptions.count',
<<"Number of current subscriptions from durable sessions in the cluster">>
)
] ++ fields(aggregated_data).
stats_schema(Name, Desc) ->
{Name, mk(non_neg_integer(), #{desc => Desc, example => 0})}.

View File

@ -310,7 +310,12 @@ consume_n_matching(Map, Pred, N, S) ->
consume_n_matching(_Map, _Pred, _N, [], Acc) ->
{lists:reverse(Acc), []};
consume_n_matching(_Map, _Pred, 0, S, Acc) ->
{lists:reverse(Acc), S};
case emqx_utils_stream:next(S) of
[] ->
{lists:reverse(Acc), []};
_ ->
{lists:reverse(Acc), S}
end;
consume_n_matching(Map, Pred, N, S0, Acc) ->
case emqx_utils_stream:next(S0) of
[] ->
@ -396,11 +401,16 @@ merge_queries(QString0, Q1, Q2) ->
Q2Page = ceil(C1 / Limit),
case Page =< Q2Page of
true ->
#{data := Data, meta := #{hasnext := HN}} = Q1(QString0),
#{
data => Data,
meta => Meta#{hasnext => HN orelse C2 > 0}
};
#{data := Data1, meta := #{hasnext := HN1}} = Q1(QString0),
maybe_fetch_from_second_query(#{
rows1 => Data1,
limit => Limit,
hasnext1 => HN1,
meta => Meta,
count2 => C2,
query2 => Q2,
query_string => QString0
});
false ->
QString = QString0#{<<"page">> => Page - Q2Page},
#{data := Data, meta := #{hasnext := HN}} = Q2(QString),
@ -421,6 +431,31 @@ merge_queries(QString0, Q1, Q2) ->
}
end.
maybe_fetch_from_second_query(Params) ->
#{
rows1 := Data1,
limit := Limit,
hasnext1 := HN1,
meta := Meta,
count2 := C2,
query2 := Q2,
query_string := QString0
} = Params,
NumRows1 = length(Data1),
{Data, HN} =
case (NumRows1 >= Limit) orelse HN1 of
true ->
{Data1, HN1 orelse C2 > 0};
false ->
#{data := Data2, meta := #{hasnext := HN2}} =
Q2(QString0#{<<"limit">> := Limit - NumRows1}),
{Data1 ++ Data2, HN2}
end,
#{
data => Data,
meta => Meta#{hasnext => HN}
}.
resp_count(Query, QFun) ->
#{meta := Meta} = QFun(Query#{<<"limit">> => 1, <<"page">> => 1}),
maps:get(count, Meta, undefined).

View File

@ -855,7 +855,7 @@ do_ds(["set_replicas", DBStr | SitesStr]) ->
{ok, DB} ->
Sites = lists:map(fun list_to_binary/1, SitesStr),
case emqx_mgmt_api_ds:update_db_sites(DB, Sites, cli) of
ok ->
{ok, _} ->
emqx_ctl:print("ok~n");
{error, Description} ->
emqx_ctl:print("Unable to update replicas: ~s~n", [Description])
@ -867,7 +867,9 @@ do_ds(["join", DBStr, Site]) ->
case emqx_utils:safe_to_existing_atom(DBStr) of
{ok, DB} ->
case emqx_mgmt_api_ds:join(DB, list_to_binary(Site), cli) of
ok ->
{ok, unchanged} ->
emqx_ctl:print("unchanged~n");
{ok, _} ->
emqx_ctl:print("ok~n");
{error, Description} ->
emqx_ctl:print("Unable to update replicas: ~s~n", [Description])
@ -879,7 +881,9 @@ do_ds(["leave", DBStr, Site]) ->
case emqx_utils:safe_to_existing_atom(DBStr) of
{ok, DB} ->
case emqx_mgmt_api_ds:leave(DB, list_to_binary(Site), cli) of
ok ->
{ok, unchanged} ->
emqx_ctl:print("unchanged~n");
{ok, _} ->
emqx_ctl:print("ok~n");
{error, Description} ->
emqx_ctl:print("Unable to update replicas: ~s~n", [Description])
@ -887,13 +891,21 @@ do_ds(["leave", DBStr, Site]) ->
{error, _} ->
emqx_ctl:print("Unknown durable storage~n")
end;
do_ds(["forget", Site]) ->
case emqx_mgmt_api_ds:forget(list_to_binary(Site), cli) of
ok ->
emqx_ctl:print("ok~n");
{error, Description} ->
emqx_ctl:print("Unable to forget site: ~s~n", [Description])
end;
do_ds(_) ->
emqx_ctl:usage([
{"ds info", "Show overview of the embedded durable storage state"},
{"ds set_replicas <storage> <site1> <site2> ...",
"Change the replica set of the durable storage"},
{"ds join <storage> <site>", "Add site to the replica set of the storage"},
{"ds leave <storage> <site>", "Remove site from the replica set of the storage"}
{"ds leave <storage> <site>", "Remove site from the replica set of the storage"},
{"ds forget <site>", "Forcefully remove a site from the list of known sites"}
]).
%%--------------------------------------------------------------------

View File

@ -65,7 +65,7 @@
<<"limiter">>,
<<"log">>,
<<"persistent_session_store">>,
<<"session_persistence">>,
<<"durable_sessions">>,
<<"prometheus">>,
<<"crl_cache">>,
<<"conn_congestion">>,

View File

@ -46,7 +46,7 @@ groups() ->
init_per_group(persistence_disabled, Config) ->
Apps = emqx_cth_suite:start(
[
{emqx, "session_persistence { enable = false }"},
{emqx, "durable_sessions { enable = false }"},
emqx_management
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
@ -59,9 +59,9 @@ init_per_group(persistence_enabled, Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
"session_persistence {\n"
"durable_sessions {\n"
" enable = true\n"
" last_alive_update_interval = 100ms\n"
" heartbeat_interval = 100ms\n"
" renew_streams_interval = 100ms\n"
"}"},
emqx_management

View File

@ -24,7 +24,6 @@
-include_lib("proper/include/proper.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
all() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
@ -80,7 +79,7 @@ end_per_suite(Config) ->
init_per_group(persistent_sessions, Config) ->
AppSpecs = [
{emqx, "session_persistence.enable = true"},
{emqx, "durable_sessions.enable = true"},
emqx_management
],
Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(
@ -581,14 +580,33 @@ t_persistent_sessions6(Config) ->
%% Wait for session to be considered expired but not GC'ed
ct:sleep(2_000),
assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}),
N1Bin = atom_to_binary(N1),
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := true}]}}},
{ok,
{{_, 200, _}, _, #{
<<"data">> := [
#{
<<"is_expired">> := true,
<<"node">> := N1Bin,
<<"disconnected_at">> := <<_/binary>>
}
]
}}},
list_request(APIPort)
)
),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"is_expired">> := true,
<<"node">> := N1Bin,
<<"disconnected_at">> := <<_/binary>>
}}},
get_client_request(APIPort, ClientId)
),
C2 = connect_client(#{port => Port1, clientid => ClientId}),
disconnect_and_destroy_session(C2),

View File

@ -29,7 +29,7 @@ all() ->
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx, "session_persistence.enable = true"},
{emqx, "durable_sessions.enable = true"},
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
@ -59,7 +59,7 @@ t_get_storages(_) ->
Path = api_path(["ds", "storages"]),
{ok, Response} = request_api(get, Path),
?assertEqual(
[<<"emqx_persistent_message">>],
[<<"messages">>],
emqx_utils_json:decode(Response, [return_maps])
).
@ -81,7 +81,7 @@ t_get_site(_) ->
<<"shards">> :=
[
#{
<<"storage">> := <<"emqx_persistent_message">>,
<<"storage">> := <<"messages">>,
<<"id">> := _,
<<"status">> := <<"up">>
}
@ -99,12 +99,12 @@ t_get_db(_) ->
request_api(get, Path400)
),
%% Valid path:
Path = api_path(["ds", "storages", "emqx_persistent_message"]),
Path = api_path(["ds", "storages", "messages"]),
{ok, Response} = request_api(get, Path),
ThisSite = emqx_ds_replication_layer_meta:this_site(),
?assertMatch(
#{
<<"name">> := <<"emqx_persistent_message">>,
<<"name">> := <<"messages">>,
<<"shards">> :=
[
#{
@ -132,7 +132,7 @@ t_get_replicas(_) ->
request_api(get, Path400)
),
%% Valid path:
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]),
Path = api_path(["ds", "storages", "messages", "replicas"]),
{ok, Response} = request_api(get, Path),
ThisSite = emqx_ds_replication_layer_meta:this_site(),
?assertEqual(
@ -141,7 +141,7 @@ t_get_replicas(_) ->
).
t_put_replicas(_) ->
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]),
Path = api_path(["ds", "storages", "messages", "replicas"]),
%% Error cases:
?assertMatch(
{ok, 400, #{<<"message">> := <<"Unknown sites: invalid_site">>}},
@ -154,13 +154,13 @@ t_put_replicas(_) ->
).
t_join(_) ->
Path400 = api_path(["ds", "storages", "emqx_persistent_message", "replicas", "unknown_site"]),
Path400 = api_path(["ds", "storages", "messages", "replicas", "unknown_site"]),
?assertMatch(
{error, {_, 400, _}},
parse_error(request_api(put, Path400))
),
ThisSite = emqx_ds_replication_layer_meta:this_site(),
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]),
Path = api_path(["ds", "storages", "messages", "replicas", ThisSite]),
?assertMatch(
{ok, "OK"},
request_api(put, Path)
@ -168,12 +168,20 @@ t_join(_) ->
t_leave(_) ->
ThisSite = emqx_ds_replication_layer_meta:this_site(),
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]),
Path = api_path(["ds", "storages", "messages", "replicas", ThisSite]),
?assertMatch(
{error, {_, 400, _}},
request_api(delete, Path)
).
t_leave_notfound(_) ->
Site = "not_part_of_replica_set",
Path = api_path(["ds", "storages", "messages", "replicas", Site]),
?assertMatch(
{error, {_, 404, _}},
request_api(delete, Path)
).
parse_error({ok, Code, JSON}) ->
{ok, Code, emqx_utils_json:decode(JSON)};
parse_error(Err) ->

View File

@ -25,12 +25,22 @@ all() ->
init_per_suite(Config) ->
meck:expect(emqx, running_nodes, 0, [node(), 'fake@node']),
emqx_mgmt_api_test_util:init_suite(),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _Api} = emqx_common_test_http:create_default_app(),
[{apps, Apps} | Config].
end_per_suite(_) ->
end_per_suite(Config) ->
Apps = proplists:get_value(apps, Config),
meck:unload(emqx),
emqx_mgmt_api_test_util:end_suite().
emqx_cth_suite:stop(Apps),
ok.
t_stats_api(_) ->
S = emqx_mgmt_api_test_util:api_path(["stats?aggregate=false"]),
@ -39,7 +49,8 @@ t_stats_api(_) ->
SystemStats1 = emqx_mgmt:get_stats(),
Fun1 =
fun(Key) ->
?assertEqual(maps:get(Key, SystemStats1), maps:get(atom_to_binary(Key, utf8), Stats1))
?assertEqual(maps:get(Key, SystemStats1), maps:get(atom_to_binary(Key, utf8), Stats1)),
?assertNot(is_map_key(<<"durable_subscriptions.count">>, Stats1), #{stats => Stats1})
end,
lists:foreach(Fun1, maps:keys(SystemStats1)),
StatsPath = emqx_mgmt_api_test_util:api_path(["stats?aggregate=true"]),

View File

@ -20,6 +20,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-define(CLIENTID, <<"api_clientid">>).
-define(USERNAME, <<"api_username">>).
@ -42,18 +43,25 @@ all() ->
].
groups() ->
CommonTCs = emqx_common_test_helpers:all(?MODULE),
AllTCs = emqx_common_test_helpers:all(?MODULE),
CommonTCs = AllTCs -- persistent_only_tcs(),
[
{mem, CommonTCs},
%% Shared subscriptions are currently not supported:
{persistent, CommonTCs -- [t_list_with_shared_sub, t_subscription_api]}
{persistent,
(CommonTCs -- [t_list_with_shared_sub, t_subscription_api]) ++ persistent_only_tcs()}
].
persistent_only_tcs() ->
[
t_mixed_persistent_sessions
].
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
"session_persistence {\n"
"durable_sessions {\n"
" enable = true\n"
" renew_streams_interval = 10ms\n"
"}"},
@ -158,6 +166,51 @@ t_subscription_api(Config) ->
SubscriptionsList2 = maps:get(<<"data">>, DataTopic2),
?assertEqual(length(SubscriptionsList2), 1).
%% Checks a few edge cases where persistent and non-persistent client subscriptions exist.
t_mixed_persistent_sessions(Config) ->
ClientConfig = ?config(client_config, Config),
PersistentClient = ?config(client, Config),
{ok, MemClient} = emqtt:start_link(ClientConfig#{clientid => <<"mem">>, properties => #{}}),
{ok, _} = emqtt:connect(MemClient),
{ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(PersistentClient, <<"t/1">>, 1),
{ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(MemClient, <<"t/1">>, 1),
%% First page with sufficient limit should have both mem and DS clients.
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"data">> := [_, _],
<<"meta">> :=
#{
<<"hasnext">> := false,
<<"count">> := 2
}
}}},
get_subs(#{page => "1"})
),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"data">> := [_],
<<"meta">> := #{<<"hasnext">> := true}
}}},
get_subs(#{page => "1", limit => "1"})
),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"data">> := [_],
<<"meta">> := #{<<"hasnext">> := false}
}}},
get_subs(#{page => "2", limit => "1"})
),
emqtt:disconnect(MemClient),
ok.
t_subscription_fuzzy_search(Config) ->
Client = proplists:get_value(client, Config),
Durable = atom_to_list(?config(durable, Config)),
@ -272,3 +325,42 @@ request_json(Method, Query, Headers) when is_list(Query) ->
path() ->
emqx_mgmt_api_test_util:api_path(["subscriptions"]).
get_subs() ->
get_subs(_QueryParams = #{}).
get_subs(QueryParams = #{}) ->
QS = uri_string:compose_query(maps:to_list(emqx_utils_maps:binary_key_map(QueryParams))),
request(get, path(), [], QS).
request(Method, Path, Params) ->
request(Method, Path, Params, _QueryParams = "").
request(Method, Path, Params, QueryParams) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
case emqx_mgmt_api_test_util:request_api(Method, Path, QueryParams, AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} ->
Body = maybe_json_decode(Body0),
{ok, {Status, Headers, Body}};
{error, {Status, Headers, Body0}} ->
Body =
case emqx_utils_json:safe_decode(Body0, [return_maps]) of
{ok, Decoded0 = #{<<"message">> := Msg0}} ->
Msg = maybe_json_decode(Msg0),
Decoded0#{<<"message">> := Msg};
{ok, Decoded0} ->
Decoded0;
{error, _} ->
Body0
end,
{error, {Status, Headers, Body}};
Error ->
Error
end.
maybe_json_decode(X) ->
case emqx_utils_json:safe_decode(X, [return_maps]) of
{ok, Decoded} -> Decoded;
{error, _} -> X
end.

View File

@ -27,7 +27,7 @@ all() ->
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx, "session_persistence.enable = true"},
{emqx, "durable_sessions.enable = true"},
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],

View File

@ -271,6 +271,16 @@ t_http_test_json_formatter(_Config) ->
}),
%% We should handle report style logging
?SLOG(error, #{msg => "recursive_republish_detected"}, #{topic => Topic}),
?TRACE("CUSTOM", "my_log_msg", #{
topic => Topic,
%% This will be converted to map
map_key => [{a, a}, {b, b}]
}),
?TRACE("CUSTOM", "my_log_msg", #{
topic => Topic,
%% We should not convert this to a map as we will lose information
map_key => [{a, a}, {a, b}]
}),
ok = emqx_trace_handler_SUITE:filesync(Name, topic),
{ok, _Detail2} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")),
{ok, Bin} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")),
@ -425,6 +435,25 @@ t_http_test_json_formatter(_Config) ->
},
NextFun()
),
?assertMatch(
#{
<<"meta">> := #{
<<"map_key">> := #{
<<"a">> := <<"a">>,
<<"b">> := <<"b">>
}
}
},
NextFun()
),
?assertMatch(
#{
<<"meta">> := #{
<<"map_key">> := [_, _]
}
},
NextFun()
),
{ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))),
?assertEqual(<<>>, Delete),

View File

@ -507,7 +507,13 @@ on_sql_query(
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
?TRACE("QUERY", "mysql_connector_received", LogMeta),
ChannelID = maps:get(channel_id, State, no_channel),
emqx_trace:rendered_action_template(ChannelID, #{sql => SQLOrKey}),
emqx_trace:rendered_action_template(
ChannelID,
#{
sql_or_key => SQLOrKey,
parameters => Params
}
),
Worker = ecpool:get_client(PoolName),
case ecpool_worker:client(Worker) of
{ok, Conn} ->

View File

@ -331,6 +331,8 @@ emqx_collect(K = emqx_channels_max, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_cluster_sessions_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_cluster_sessions_max, D) -> gauge_metrics(?MG(K, D));
%% pub/sub stats
emqx_collect(K = emqx_durable_subscriptions_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_durable_subscriptions_max, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_topics_count, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_topics_max, D) -> gauge_metrics(?MG(K, D));
emqx_collect(K = emqx_suboptions_count, D) -> gauge_metrics(?MG(K, D));
@ -541,6 +543,8 @@ stats_metric_meta() ->
{emqx_subscribers_max, gauge, 'subscribers.max'},
{emqx_subscriptions_count, gauge, 'subscriptions.count'},
{emqx_subscriptions_max, gauge, 'subscriptions.max'},
{emqx_durable_subscriptions_count, gauge, 'durable_subscriptions.count'},
{emqx_durable_subscriptions_max, gauge, 'durable_subscriptions.max'},
%% delayed
{emqx_delayed_count, gauge, 'delayed.count'},
{emqx_delayed_max, gauge, 'delayed.max'}

View File

@ -402,6 +402,8 @@ assert_json_data__stats(M, Mode) when
#{
emqx_connections_count := _,
emqx_connections_max := _,
emqx_durable_subscriptions_count := _,
emqx_durable_subscriptions_max := _,
emqx_live_connections_count := _,
emqx_live_connections_max := _,
emqx_sessions_count := _,

View File

@ -1174,12 +1174,13 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
{ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} ->
{error, {unrecoverable_error, unhealthy_target}};
{ok, _Group, Resource} ->
PrevLoggerProcessMetadata = logger:get_process_metadata(),
QueryResult =
try
set_rule_id_trace_meta_data(Query),
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource)
after
unset_rule_id_trace_meta_data()
reset_logger_process_metadata(PrevLoggerProcessMetadata)
end,
QueryResult;
{error, not_found} ->
@ -1190,27 +1191,37 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) ->
%% Get the rule ids from requests
RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests),
ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests),
RuleTriggerTimes = lists:foldl(fun collect_rule_trigger_times/2, [], Requests),
StopAfterRenderVal =
RuleTriggerTimes0 = lists:foldl(fun collect_rule_trigger_times/2, [], Requests),
RuleTriggerTimes = lists:flatten(RuleTriggerTimes0),
TraceMetadata =
case Requests of
%% We know that the batch is not mixed since we prevent this by
%% using a stop_after function in the replayq:pop call
[?QUERY(_, _, _, _, #{stop_action_after_render := true}) | _] ->
true;
#{
rule_ids => RuleIDs,
client_ids => ClientIDs,
rule_trigger_ts => RuleTriggerTimes,
stop_action_after_render => true
};
[?QUERY(_, _, _, _, _TraceCTX) | _] ->
false
#{
rule_ids => RuleIDs,
client_ids => ClientIDs,
rule_trigger_ts => RuleTriggerTimes
}
end,
logger:update_process_metadata(#{
rule_ids => RuleIDs,
client_ids => ClientIDs,
rule_trigger_times => RuleTriggerTimes,
stop_action_after_render => StopAfterRenderVal
}),
logger:update_process_metadata(TraceMetadata),
ok;
set_rule_id_trace_meta_data(Request) ->
set_rule_id_trace_meta_data([Request]),
ok.
reset_logger_process_metadata(undefined = _PrevProcessMetadata) ->
logger:unset_process_metadata();
reset_logger_process_metadata(PrevProcessMetadata) ->
logger:set_process_metadata(PrevProcessMetadata).
collect_rule_id(?QUERY(_, _, _, _, #{rule_id := RuleId}), Acc) ->
Acc#{RuleId => true};
collect_rule_id(?QUERY(_, _, _, _, _), Acc) ->
@ -1221,19 +1232,11 @@ collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) ->
collect_client_id(?QUERY(_, _, _, _, _), Acc) ->
Acc.
collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_time := Time}), Acc) ->
collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_ts := Time}), Acc) ->
[Time | Acc];
collect_rule_trigger_times(?QUERY(_, _, _, _, _), Acc) ->
Acc.
unset_rule_id_trace_meta_data() ->
logger:update_process_metadata(#{
rule_ids => #{},
client_ids => #{},
stop_action_after_render => false,
rule_trigger_times => []
}).
%% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1
extract_connector_id(Id) when is_binary(Id) ->
case binary:split(Id, <<":">>, [global]) of

View File

@ -402,7 +402,7 @@ retried_failed_inc(ID, Val) ->
retried_failed_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'retried.failed').
%% @doc Count messages that were sucessfully sent after at least one retry
%% @doc Count messages that were successfully sent after at least one retry
retried_success_inc(ID) ->
retried_success_inc(ID, 1).

View File

@ -319,6 +319,13 @@ fields("ctx_delivery_dropped") ->
{"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})},
{"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})}
| msg_event_common_fields()
];
fields("ctx_schema_validation_failed") ->
Event = 'schema.validation_failed',
[
{"event_type", event_type_sc(Event)},
{"validation", sc(binary(), #{desc => ?DESC("event_validation")})}
| msg_event_common_fields()
].
rule_input_message_context() ->
@ -337,7 +344,8 @@ rule_input_message_context() ->
ref("ctx_check_authz_complete"),
ref("ctx_check_authn_complete"),
ref("ctx_bridge_mqtt"),
ref("ctx_delivery_dropped")
ref("ctx_delivery_dropped"),
ref("ctx_schema_validation_failed")
]),
#{
desc => ?DESC("test_context"),

View File

@ -894,7 +894,7 @@ test_columns('client.connack') ->
[
{<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]},
{<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]},
{<<"reason_code">>, [<<"sucess">>, <<"the reason code">>]}
{<<"reason_code">>, [<<"success">>, <<"the reason code">>]}
];
test_columns('client.check_authz_complete') ->
[

View File

@ -70,6 +70,7 @@ apply_rule_discard_result(Rule, Columns, Envs) ->
ok.
apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
PrevProcessMetadata = logger:get_process_metadata(),
set_process_trace_metadata(RuleID, Columns),
trace_rule_sql(
"rule_activated",
@ -137,21 +138,26 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
),
{error, {Error, StkTrace}}
after
reset_process_trace_metadata(Columns)
reset_logger_process_metadata(PrevProcessMetadata)
end.
set_process_trace_metadata(RuleID, #{clientid := ClientID} = Columns) ->
logger:update_process_metadata(#{
clientid => ClientID,
rule_id => RuleID,
rule_trigger_time => rule_trigger_time(Columns)
rule_trigger_ts => [rule_trigger_time(Columns)]
});
set_process_trace_metadata(RuleID, Columns) ->
logger:update_process_metadata(#{
rule_id => RuleID,
rule_trigger_time => rule_trigger_time(Columns)
rule_trigger_ts => [rule_trigger_time(Columns)]
}).
reset_logger_process_metadata(undefined = _PrevProcessMetadata) ->
logger:unset_process_metadata();
reset_logger_process_metadata(PrevProcessMetadata) ->
logger:set_process_metadata(PrevProcessMetadata).
rule_trigger_time(Columns) ->
case Columns of
#{timestamp := Timestamp} ->
@ -160,18 +166,6 @@ rule_trigger_time(Columns) ->
erlang:system_time(millisecond)
end.
reset_process_trace_metadata(#{clientid := _ClientID}) ->
Meta = logger:get_process_metadata(),
Meta1 = maps:remove(clientid, Meta),
Meta2 = maps:remove(rule_id, Meta1),
Meta3 = maps:remove(rule_trigger_time, Meta2),
logger:set_process_metadata(Meta3);
reset_process_trace_metadata(_) ->
Meta = logger:get_process_metadata(),
Meta1 = maps:remove(rule_id, Meta),
Meta2 = maps:remove(rule_trigger_time, Meta1),
logger:set_process_metadata(Meta2).
do_apply_rule(
#{
id := RuleId,
@ -528,30 +522,40 @@ do_handle_action_get_trace_inc_metrics_context(RuleID, Action) ->
end.
do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta) ->
StopAfterRender = maps:get(stop_action_after_render, TraceMeta, false),
StopAfterRenderMap =
case maps:get(stop_action_after_render, TraceMeta, false) of
false ->
#{};
true ->
#{stop_action_after_render => true}
end,
case TraceMeta of
#{
rule_id := RuleID,
clientid := ClientID,
rule_trigger_time := Timestamp
rule_trigger_ts := Timestamp
} ->
#{
rule_id => RuleID,
clientid => ClientID,
action_id => Action,
stop_action_after_render => StopAfterRender,
rule_trigger_time => Timestamp
};
maps:merge(
#{
rule_id => RuleID,
clientid => ClientID,
action_id => Action,
rule_trigger_ts => Timestamp
},
StopAfterRenderMap
);
#{
rule_id := RuleID,
rule_trigger_time := Timestamp
rule_trigger_ts := Timestamp
} ->
#{
rule_id => RuleID,
action_id => Action,
stop_action_after_render => StopAfterRender,
rule_trigger_time => Timestamp
}
maps:merge(
#{
rule_id => RuleID,
action_id => Action,
rule_trigger_ts => Timestamp
},
StopAfterRenderMap
)
end.
action_info({bridge, BridgeType, BridgeName, _ResId}) ->
@ -740,7 +744,20 @@ nested_put(Alias, Val, Columns0) ->
emqx_rule_maps:nested_put(Alias, Val, Columns).
inc_action_metrics(TraceCtx, Result) ->
_ = do_inc_action_metrics(TraceCtx, Result),
SavedMetaData = logger:get_process_metadata(),
try
%% To not pollute the trace we temporary remove the process meta data
logger:unset_process_metadata(),
_ = do_inc_action_metrics(TraceCtx, Result)
after
%% Setting process metadata to undefined yields an error
case SavedMetaData of
undefined ->
ok;
_ ->
logger:set_process_metadata(SavedMetaData)
end
end,
Result.
do_inc_action_metrics(

View File

@ -52,7 +52,8 @@ do_apply_rule(
do_apply_matched_rule(
Rule,
Context,
StopAfterRender
StopAfterRender,
EventTopics
);
false ->
{error, nomatch}
@ -61,21 +62,29 @@ do_apply_rule(
case lists:member(InTopic, EventTopics) of
true ->
%% the rule is for both publish and events, test it directly
do_apply_matched_rule(Rule, Context, StopAfterRender);
do_apply_matched_rule(Rule, Context, StopAfterRender, EventTopics);
false ->
{error, nomatch}
end
end.
do_apply_matched_rule(Rule, Context, StopAfterRender) ->
update_process_trace_metadata(StopAfterRender),
ApplyRuleRes = emqx_rule_runtime:apply_rule(
Rule,
Context,
apply_rule_environment()
),
reset_trace_process_metadata(StopAfterRender),
ApplyRuleRes.
do_apply_matched_rule(Rule, Context, StopAfterRender, EventTopics) ->
PrevLoggerProcessMetadata = logger:get_process_metadata(),
try
update_process_trace_metadata(StopAfterRender),
FullContext = fill_default_values(
hd(EventTopics),
emqx_rule_maps:atom_key_map(Context)
),
ApplyRuleRes = emqx_rule_runtime:apply_rule(
Rule,
FullContext,
apply_rule_environment()
),
ApplyRuleRes
after
reset_logger_process_metadata(PrevLoggerProcessMetadata)
end.
update_process_trace_metadata(true = _StopAfterRender) ->
logger:update_process_metadata(#{
@ -84,12 +93,10 @@ update_process_trace_metadata(true = _StopAfterRender) ->
update_process_trace_metadata(false = _StopAfterRender) ->
ok.
reset_trace_process_metadata(true = _StopAfterRender) ->
Meta = logger:get_process_metadata(),
NewMeta = maps:remove(stop_action_after_render, Meta),
logger:set_process_metadata(NewMeta);
reset_trace_process_metadata(false = _StopAfterRender) ->
ok.
reset_logger_process_metadata(undefined = _PrevProcessMetadata) ->
logger:unset_process_metadata();
reset_logger_process_metadata(PrevProcessMetadata) ->
logger:set_process_metadata(PrevProcessMetadata).
%% At the time of writing the environment passed to the apply rule function is
%% not used at all for normal actions. When it is used for custom functions it
@ -197,6 +204,8 @@ is_test_runtime_env() ->
%% Most events have the original `topic' input, but their own topic (i.e.: `$events/...')
%% is different from `topic'.
get_in_topic(#{event_type := schema_validation_failed}) ->
<<"$events/schema_validation_failed">>;
get_in_topic(Context) ->
case maps:find(event_topic, Context) of
{ok, EventTopic} ->

View File

@ -243,7 +243,7 @@ t_rule_test_smoke(_Config) ->
#{
<<"clientid">> => <<"c_emqx">>,
<<"event_type">> => <<"client_connack">>,
<<"reason_code">> => <<"sucess">>,
<<"reason_code">> => <<"success">>,
<<"username">> => <<"u_emqx">>
},
<<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">>

View File

@ -216,18 +216,15 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
end
)
end,
%% Check that rule_trigger_time meta field is present in all log entries
%% Check that rule_trigger_ts meta field is present in all log entries
Log0 = read_rule_trace_file(TraceName, TraceType, Now),
Log1 = binary:split(Log0, <<"\n">>, [global, trim]),
Log2 = lists:join(<<",\n">>, Log1),
Log3 = iolist_to_binary(["[", Log2, "]"]),
{ok, LogEntries} = emqx_utils_json:safe_decode(Log3, [return_maps]),
[#{<<"meta">> := #{<<"rule_trigger_time">> := RuleTriggerTime}} | _] = LogEntries,
[#{<<"meta">> := #{<<"rule_trigger_ts">> := [RuleTriggerTime]}} | _] = LogEntries,
[
?assert(
(maps:get(<<"rule_trigger_time">>, Meta, no_time) =:= RuleTriggerTime) orelse
(lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_times">>, Meta, [])))
)
?assert(lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_ts">>, Meta, [])))
|| #{<<"meta">> := Meta} <- LogEntries
],
ok.
@ -265,8 +262,7 @@ do_final_log_check(Action, Bin0) when is_binary(Action) ->
<<"result">> := <<"ok">>
},
<<"rule_id">> := _,
<<"rule_trigger_time">> := _,
<<"stop_action_after_render">> := false,
<<"rule_trigger_ts">> := _,
<<"trace_tag">> := <<"ACTION">>
},
<<"msg">> := <<"action_success">>,
@ -360,9 +356,10 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
ok;
CheckBatchesFunRec(CurCount) ->
receive
[{_, #{<<"stop_after_render">> := StopValue}} | _] = List ->
[{_, FirstMsg} | _] = List ->
StopValue = maps:get(<<"stop_after_render">>, FirstMsg, false),
[
?assertMatch(#{<<"stop_after_render">> := StopValue}, Msg)
?assertEqual(StopValue, maps:get(<<"stop_after_render">>, Msg, false))
|| {_, Msg} <- List
],
Len = length(List),
@ -419,21 +416,20 @@ t_apply_rule_test_format_action_failed(_Config) ->
<<"name">> := _,
<<"type">> := <<"rule_engine_test">>
},
<<"client_ids">> := [],
<<"clientid">> := _,
<<"reason">> := <<"MY REASON">>,
<<"rule_id">> := _,
<<"rule_ids">> := [],
<<"rule_trigger_time">> := _,
<<"rule_trigger_times">> := [],
<<"stop_action_after_render">> := false,
<<"rule_trigger_ts">> := _,
<<"trace_tag">> := <<"ACTION">>
},
<<"msg">> := <<"action_failed">>,
<<"time">> := _
},
LastEntryJSON
)
),
MetaMap = maps:get(<<"meta">>, LastEntryJSON),
?assert(not maps:is_key(<<"client_ids">>, MetaMap)),
?assert(not maps:is_key(<<"rule_ids">>, MetaMap))
end,
do_apply_rule_test_format_action_failed_test(1, CheckFun).
@ -494,8 +490,7 @@ out_of_service_check_fun(SendErrorMsg, Reason) ->
<<"clientid">> := _,
<<"reason">> := <<"request_expired">>,
<<"rule_id">> := _,
<<"rule_trigger_time">> := _,
<<"stop_action_after_render">> := false,
<<"rule_trigger_ts">> := _,
<<"trace_tag">> := <<"ACTION">>
},
<<"msg">> := <<"action_failed">>,
@ -511,7 +506,6 @@ out_of_service_check_fun(SendErrorMsg, Reason) ->
<<"level">> := <<"debug">>,
<<"meta">> :=
#{
<<"client_ids">> := [],
<<"clientid">> := _,
<<"id">> := _,
<<"reason">> :=
@ -521,17 +515,17 @@ out_of_service_check_fun(SendErrorMsg, Reason) ->
<<"msg">> := <<"MY_RECOVERABLE_REASON">>
},
<<"rule_id">> := _,
<<"rule_ids">> := [],
<<"rule_trigger_time">> := _,
<<"rule_trigger_times">> := [],
<<"stop_action_after_render">> := false,
<<"rule_trigger_ts">> := _,
<<"trace_tag">> := <<"ERROR">>
},
<<"msg">> := SendErrorMsg,
<<"time">> := _
},
ReasonEntryJSON
)
),
MetaMap = maps:get(<<"meta">>, ReasonEntryJSON),
?assert(not maps:is_key(<<"client_ids">>, MetaMap)),
?assert(not maps:is_key(<<"rule_ids">>, MetaMap))
end.
meck_test_connector_recoverable_errors(Reason) ->

View File

@ -165,7 +165,7 @@ t_ctx_connack(_) ->
clean_start => true,
clientid => <<"c_emqx">>,
event_type => client_connack,
reason_code => <<"sucess">>,
reason_code => <<"success">>,
username => <<"u_emqx">>
},
Expected = check_result([clientid, username, reason_code], [node], Context),
@ -237,6 +237,21 @@ t_ctx_delivery_dropped(_) ->
Expected = check_result([from_clientid, from_username, reason, qos, topic], [], Context),
do_test(SQL, Context, Expected).
t_ctx_schema_validation_failed(_) ->
SQL =
<<"SELECT validation FROM \"$events/schema_validation_failed\"">>,
Context = #{
<<"clientid">> => <<"c_emqx">>,
<<"event_type">> => <<"schema_validation_failed">>,
<<"payload">> => <<"{\"msg\": \"hello\"}">>,
<<"qos">> => 1,
<<"topic">> => <<"t/a">>,
<<"username">> => <<"u_emqx">>,
<<"validation">> => <<"m">>
},
Expected = check_result([validation], [], Context),
do_test(SQL, Context, Expected).
t_mongo_date_function_should_return_string_in_test_env(_) ->
SQL =
<<"SELECT mongo_date() as mongo_date FROM \"$events/client_check_authz_complete\"">>,

View File

@ -35,6 +35,10 @@
%% `emqx_config_handler' API
-export([pre_config_update/3, post_config_update/5]).
%% `emqx_config_backup' API
-behaviour(emqx_config_backup).
-export([import_config/1]).
%% Internal exports
-export([parse_sql_check/1]).
@ -49,6 +53,7 @@
-define(TRACE_TAG, "SCHEMA_VALIDATION").
-define(CONF_ROOT, schema_validation).
-define(CONF_ROOT_BIN, <<"schema_validation">>).
-define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]).
-type validation_name() :: binary().
@ -60,12 +65,14 @@
-spec add_handler() -> ok.
add_handler() ->
ok = emqx_config_handler:add_handler([?CONF_ROOT], ?MODULE),
ok = emqx_config_handler:add_handler(?VALIDATIONS_CONF_PATH, ?MODULE),
ok.
-spec remove_handler() -> ok.
remove_handler() ->
ok = emqx_config_handler:remove_handler(?VALIDATIONS_CONF_PATH),
ok = emqx_config_handler:remove_handler([?CONF_ROOT]),
ok.
load() ->
@ -180,7 +187,12 @@ pre_config_update(?VALIDATIONS_CONF_PATH, {update, Validation}, OldValidations)
pre_config_update(?VALIDATIONS_CONF_PATH, {delete, Validation}, OldValidations) ->
delete(OldValidations, Validation);
pre_config_update(?VALIDATIONS_CONF_PATH, {reorder, Order}, OldValidations) ->
reorder(OldValidations, Order).
reorder(OldValidations, Order);
pre_config_update([?CONF_ROOT], {merge, NewConfig}, OldConfig) ->
#{resulting_config := Config} = prepare_config_merge(NewConfig, OldConfig),
{ok, Config};
pre_config_update([?CONF_ROOT], {replace, NewConfig}, _OldConfig) ->
{ok, NewConfig}.
post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
{Pos, Validation} = fetch_with_index(New, Name),
@ -197,7 +209,81 @@ post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs)
ok;
post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) ->
ok = emqx_schema_validation_registry:reindex_positions(New),
ok.
ok;
post_config_update([?CONF_ROOT], {merge, _}, ResultingConfig, Old, _AppEnvs) ->
#{validations := ResultingValidations} = ResultingConfig,
#{validations := OldValidations} = Old,
#{added := NewValidations0} =
emqx_utils:diff_lists(
ResultingValidations,
OldValidations,
fun(#{name := N}) -> N end
),
NewValidations =
lists:map(
fun(#{name := Name}) ->
{Pos, Validation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:insert(Pos, Validation),
#{name => Name, pos => Pos}
end,
NewValidations0
),
{ok, #{new_validations => NewValidations}};
post_config_update([?CONF_ROOT], {replace, Input}, ResultingConfig, Old, _AppEnvs) ->
#{
new_validations := NewValidations,
changed_validations := ChangedValidations0,
deleted_validations := DeletedValidations
} = prepare_config_replace(Input, Old),
#{validations := ResultingValidations} = ResultingConfig,
#{validations := OldValidations} = Old,
lists:foreach(
fun(Name) ->
{_Pos, Validation} = fetch_with_index(OldValidations, Name),
ok = emqx_schema_validation_registry:delete(Validation)
end,
DeletedValidations
),
lists:foreach(
fun(Name) ->
{Pos, Validation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:insert(Pos, Validation)
end,
NewValidations
),
ChangedValidations =
lists:map(
fun(Name) ->
{_Pos, OldValidation} = fetch_with_index(OldValidations, Name),
{Pos, NewValidation} = fetch_with_index(ResultingValidations, Name),
ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
#{name => Name, pos => Pos}
end,
ChangedValidations0
),
ok = emqx_schema_validation_registry:reindex_positions(ResultingValidations),
{ok, #{changed_validations => ChangedValidations}}.
%%------------------------------------------------------------------------------
%% `emqx_config_backup' API
%%------------------------------------------------------------------------------
import_config(#{?CONF_ROOT_BIN := RawConf0}) ->
Result = emqx_conf:update(
[?CONF_ROOT],
{merge, RawConf0},
#{override_to => cluster, rawconf_with_defaults => true}
),
case Result of
{error, Reason} ->
{error, #{root_key => ?CONF_ROOT, reason => Reason}};
{ok, _} ->
Keys0 = maps:keys(RawConf0),
ChangedPaths = Keys0 -- [<<"validations">>],
{ok, #{root_key => ?CONF_ROOT, changed => ChangedPaths}}
end;
import_config(_RawConf) ->
{ok, #{root_key => ?CONF_ROOT, changed => []}}.
%%------------------------------------------------------------------------------
%% Internal exports
@ -471,3 +557,55 @@ run_schema_validation_failed_hook(Message, Validation) ->
#{name := Name} = Validation,
ValidationContext = #{name => Name},
emqx_hooks:run('schema.validation_failed', [Message, ValidationContext]).
%% "Merging" in the context of the validation array means:
%% * Existing validations (identified by `name') are left untouched.
%% * No validations are removed.
%% * New validations are appended to the existing list.
%% * Existing validations are not reordered.
prepare_config_merge(NewConfig0, OldConfig) ->
{ImportedRawValidations, NewConfigNoValidations} =
case maps:take(<<"validations">>, NewConfig0) of
error ->
{[], NewConfig0};
{V, R} ->
{V, R}
end,
OldRawValidations = maps:get(<<"validations">>, OldConfig, []),
#{added := NewRawValidations} = emqx_utils:diff_lists(
ImportedRawValidations,
OldRawValidations,
fun(#{<<"name">> := N}) -> N end
),
Config0 = emqx_utils_maps:deep_merge(OldConfig, NewConfigNoValidations),
Config = maps:update_with(
<<"validations">>,
fun(OldVs) -> OldVs ++ NewRawValidations end,
NewRawValidations,
Config0
),
#{
new_validations => NewRawValidations,
resulting_config => Config
}.
prepare_config_replace(NewConfig, OldConfig) ->
ImportedRawValidations = maps:get(<<"validations">>, NewConfig, []),
OldValidations = maps:get(validations, OldConfig, []),
%% Since, at this point, we have an input raw config but a parsed old config, we
%% project both to the to have only their names, and consider common names as changed.
#{
added := NewValidations,
removed := DeletedValidations,
changed := ChangedValidations0,
identical := ChangedValidations1
} = emqx_utils:diff_lists(
lists:map(fun(#{<<"name">> := N}) -> N end, ImportedRawValidations),
lists:map(fun(#{name := N}) -> N end, OldValidations),
fun(N) -> N end
),
#{
new_validations => NewValidations,
changed_validations => ChangedValidations0 ++ ChangedValidations1,
deleted_validations => DeletedValidations
}.

View File

@ -229,6 +229,29 @@ monitor_metrics() ->
ct:pal("monitor metrics result:\n ~p", [Res]),
simplify_result(Res).
upload_backup(BackupFilePath) ->
Path = emqx_mgmt_api_test_util:api_path(["data", "files"]),
Res = emqx_mgmt_api_test_util:upload_request(
Path,
BackupFilePath,
"filename",
<<"application/octet-stream">>,
[],
emqx_mgmt_api_test_util:auth_header_()
),
simplify_result(Res).
export_backup() ->
Path = emqx_mgmt_api_test_util:api_path(["data", "export"]),
Res = request(post, Path, []),
simplify_result(Res).
import_backup(BackupName) ->
Path = emqx_mgmt_api_test_util:api_path(["data", "import"]),
Body = #{<<"filename">> => unicode:characters_to_binary(BackupName)},
Res = request(post, Path, Body),
simplify_result(Res).
connect(ClientId) ->
connect(ClientId, _IsPersistent = false).
@ -438,6 +461,12 @@ assert_monitor_metrics() ->
),
ok.
normalize_validations(RawValidations) ->
[
V#{<<"topics">> := [T]}
|| #{<<"topics">> := T} = V <- RawValidations
].
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -1216,3 +1245,142 @@ t_schema_check_protobuf(_Config) ->
),
ok.
%% Tests that restoring a backup config works.
%% * Existing validations (identified by `name') are left untouched.
%% * No validations are removed.
%% * New validations are appended to the existing list.
%% * Existing validations are not reordered.
t_import_config_backup(_Config) ->
%% Setup backup file.
%% Will clash with existing validation; different order.
Name2 = <<"2">>,
Check2B = sql_check(<<"select 2 where false">>),
Validation2B = validation(Name2, [Check2B]),
{201, _} = insert(Validation2B),
%% Will clash with existing validation.
Name1 = <<"1">>,
Check1B = sql_check(<<"select 1 where false">>),
Validation1B = validation(Name1, [Check1B]),
{201, _} = insert(Validation1B),
%% New validation; should be appended
Name4 = <<"4">>,
Check4 = sql_check(<<"select 4 where true">>),
Validation4 = validation(Name4, [Check4]),
{201, _} = insert(Validation4),
{200, #{<<"filename">> := BackupName}} = export_backup(),
%% Clear this setup and pretend we have other data to begin with.
clear_all_validations(),
{200, []} = list(),
Check1A = sql_check(<<"select 1 where true">>),
Validation1A = validation(Name1, [Check1A]),
{201, _} = insert(Validation1A),
Check2A = sql_check(<<"select 2 where true">>),
Validation2A = validation(Name2, [Check2A]),
{201, _} = insert(Validation2A),
Name3 = <<"3">>,
Check3 = sql_check(<<"select 3 where true">>),
Validation3 = validation(Name3, [Check3]),
{201, _} = insert(Validation3),
{204, _} = import_backup(BackupName),
ExpectedValidations = normalize_validations([
Validation1A,
Validation2A,
Validation3,
Validation4
]),
?assertMatch({200, ExpectedValidations}, list()),
?assertIndexOrder([Name1, Name2, Name3, Name4], <<"t/a">>),
ok.
%% Tests that importing configurations from the CLI interface work.
t_load_config(_Config) ->
Name1 = <<"1">>,
Check1A = sql_check(<<"select 1 where true">>),
Validation1A = validation(Name1, [Check1A]),
{201, _} = insert(Validation1A),
Name2 = <<"2">>,
Check2A = sql_check(<<"select 2 where true">>),
Validation2A = validation(Name2, [Check2A]),
{201, _} = insert(Validation2A),
Name3 = <<"3">>,
Check3 = sql_check(<<"select 3 where true">>),
Validation3 = validation(Name3, [Check3]),
{201, _} = insert(Validation3),
%% Config to load
%% Will replace existing config
Check2B = sql_check(<<"select 2 where false">>),
Validation2B = validation(Name2, [Check2B]),
%% Will replace existing config
Check1B = sql_check(<<"select 1 where false">>),
Validation1B = validation(Name1, [Check1B]),
%% New validation; should be appended
Name4 = <<"4">>,
Check4 = sql_check(<<"select 4 where true">>),
Validation4 = validation(Name4, [Check4]),
ConfRootBin = <<"schema_validation">>,
ConfigToLoad1 = #{
ConfRootBin => #{
<<"validations">> => [Validation2B, Validation1B, Validation4]
}
},
ConfigToLoadBin1 = iolist_to_binary(hocon_pp:do(ConfigToLoad1, #{})),
?assertMatch(ok, emqx_conf_cli:load_config(ConfigToLoadBin1, #{mode => merge})),
ExpectedValidations1 = normalize_validations([
Validation1A,
Validation2A,
Validation3,
Validation4
]),
?assertMatch(
#{
ConfRootBin := #{
<<"validations">> := ExpectedValidations1
}
},
emqx_conf_cli:get_config(<<"schema_validation">>)
),
?assertIndexOrder([Name1, Name2, Name3, Name4], <<"t/a">>),
%% Replace
Check4B = sql_check(<<"select 4, true where true">>),
Validation4B = validation(Name4, [Check4B]),
Name5 = <<"5">>,
Check5 = sql_check(<<"select 5 where true">>),
Validation5 = validation(Name5, [Check5]),
ConfigToLoad2 = #{
ConfRootBin => #{<<"validations">> => [Validation4B, Validation3, Validation5]}
},
ConfigToLoadBin2 = iolist_to_binary(hocon_pp:do(ConfigToLoad2, #{})),
?assertMatch(ok, emqx_conf_cli:load_config(ConfigToLoadBin2, #{mode => replace})),
ExpectedValidations2 = normalize_validations([Validation4B, Validation3, Validation5]),
?assertMatch(
#{
ConfRootBin := #{
<<"validations">> := ExpectedValidations2
}
},
emqx_conf_cli:get_config(<<"schema_validation">>)
),
?assertIndexOrder([Name4, Name3, Name5], <<"t/a">>),
ok.

View File

@ -751,7 +751,6 @@ safe_filename(Filename) when is_list(Filename) ->
when
Func :: fun((T) -> any()),
T :: any().
diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) ->
Removed =
lists:foldl(

View File

@ -1,2 +1,2 @@
Make it possible to override `session_persistence` settings per zone.
Make it possible to override `durable_sessions` settings per zone.
Since durable sessions are inherently more expensive to maintain than the regular sessions, it's desirable to grant the operator finer control of session durability for different classes of clients.

View File

@ -0,0 +1,3 @@
Improve HTTP authentication error log message.
If HTTP content-type header is missing for POST method, it now emits a meaningful error message instead of a less readable exception with stack trace.

View File

@ -0,0 +1,2 @@
- Rename durable storage for MQTT messages from `emqx_persistent_message` to `messages`
- Rename configuration root from `session_persistence` to `durable_sessions`

View File

@ -0,0 +1 @@
Rename configuration parameter `durable_sessions.last_alive_update_interval` to `durable_sessions.heartbeat_interval`.

View File

@ -0,0 +1 @@
Adds a new `durable_subscriptions.count` statistic to track subscriptions that are tied to durable sessions. `subscriptions.count` does not include such subscriptions.

View File

@ -1,64 +0,0 @@
emqx_bridge_s3_aggreg_upload {
s3_aggregated_upload.label:
"""S3 Aggregated Upload"""
s3_aggregated_upload.desc:
"""Action that enables time-based aggregation of incoming events and uploading them to the S3 service as a single object."""
s3_aggregated_upload_parameters.label:
"""S3 Aggregated Upload action parameters"""
s3_aggregated_upload_parameters.desc:
"""Set of parameters for the aggregated upload action."""
s3_aggregation.label:
"""Aggregation parameters"""
s3_aggregation.desc:
"""Set of parameters governing the aggregation process."""
s3_aggregation_interval.label:
"""Time interval"""
s3_aggregation_interval.desc:
"""Amount of time events will be aggregated in a single object before uploading."""
s3_aggregation_max_records.label:
"""Maximum number of records"""
s3_aggregation_max_records.desc:
"""Number of records (events) allowed per each aggregated object. Each aggregated upload will contain no more than that number of events, but may contain less.<br/>
If event rate is high enough, there obviously may be more than one aggregated upload during the same time interval. These uploads will have different, but consecutive sequence numbers, which will be a part of S3 object key."""
s3_aggregated_container.label:
"""Container for aggregated events"""
s3_aggregated_container.desc:
"""Settings governing the file format of an upload containing aggregated events."""
s3_aggregated_container_csv.label:
"""CSV container"""
s3_aggregated_container_csv.desc:
"""Records (events) will be aggregated and uploaded as a CSV file."""
s3_aggregated_container_csv_column_order.label:
"""CSV column order"""
s3_aggregated_container_csv_column_order.desc:
"""Event fields that will be ordered first as columns in the resulting CSV file.<br/>
Regardless of this setting, resulting CSV will contain all the fields of aggregated events, but all the columns not explicitly mentioned here will be ordered after the ones listed here in the lexicographical order."""
s3_aggregated_upload_key.label:
"""S3 object key template"""
s3_aggregated_upload_key.desc:
"""Template for the S3 object key of an aggregated upload.<br/>
Template may contain placeholders for the following variables:
<ul>
<li><code>${action}</code>: name of the action (required).<li/>
<li><code>${node}</code>: name of the EMQX node conducting the upload (required).<li/>
<li><code>${datetime.{format}}</code>: date and time when aggregation started, formatted according to the <code>{format}</code> string (required):
<ul>
<li><code>${datetime.rfc3339utc}</code>: RFC3339-formatted date and time in UTC,<li/>
<li><code>${datetime.rfc3339}</code>: RFC3339-formatted date and time in local timezone,<li/>
<li><code>${datetime.unix}</code>: Unix timestamp.<li/>
</ul>
<li/>
<li><code>${datetime_until.{format}}</code>: date and time when aggregation ended, with the same formatting options.<li/>
<li><code>${sequence}</code>: sequence number of the aggregated upload within the same time interval (required).<li/>
</ul>
All other placeholders are considered invalid. Note that placeholders marked as required will be added as a path suffix to the S3 object key if they are missing from the template."""
}

View File

@ -1,13 +1,23 @@
emqx_bridge_s3_upload {
s3_upload.label:
"""S3 Simple Upload"""
"""Upload to S3"""
s3_upload.desc:
"""Action to upload a single object to the S3 service."""
"""Action that takes incoming events and uploads them to the S3 API compatible service."""
s3_upload_parameters.label:
"""S3 Upload action parameters"""
s3_upload_parameters.desc:
s3_parameters.label:
"""S3 Upload parameters"""
s3_parameters.desc:
"""Set of parameters for the upload action."""
s3_direct_upload_mode.label:
"""Direct S3 Upload"""
s3_direct_upload_mode.desc:
"""Enables uploading of events to the S3 service as separate objects."""
s3_direct_upload_parameters.label:
"""Direct S3 Upload action parameters"""
s3_direct_upload_parameters.desc:
"""Set of parameters for the upload action. Action supports templates in S3 bucket name, object key and object content."""
s3_object_content.label:
@ -15,4 +25,66 @@ s3_object_content.label:
s3_object_content.desc:
"""Content of the S3 object being uploaded. Supports templates."""
s3_aggregated_upload_mode.label:
"""Aggregated S3 Upload"""
s3_aggregated_upload_mode.desc:
"""Enables time-based aggregation of incoming events and uploading them to the S3 service as a single object."""
s3_aggregated_upload_parameters.label:
"""Aggregated S3 Upload action parameters"""
s3_aggregated_upload_parameters.desc:
"""Set of parameters for the aggregated upload action."""
s3_aggregation.label:
"""Aggregation parameters"""
s3_aggregation.desc:
"""Set of parameters governing the aggregation process."""
s3_aggregation_interval.label:
"""Time interval"""
s3_aggregation_interval.desc:
"""Amount of time events will be aggregated in a single object before uploading."""
s3_aggregation_max_records.label:
"""Maximum number of records"""
s3_aggregation_max_records.desc:
"""Number of records (events) allowed per each aggregated object. Each aggregated upload will contain no more than that number of events, but may contain less.<br/>
If event rate is high enough, there obviously may be more than one aggregated upload during the same time interval. These uploads will have different, but consecutive sequence numbers, which will be a part of S3 object key."""
s3_aggregated_container.label:
"""Container for aggregated events"""
s3_aggregated_container.desc:
"""Settings governing the file format of an upload containing aggregated events."""
s3_aggregated_container_csv.label:
"""CSV container"""
s3_aggregated_container_csv.desc:
"""Records (events) will be aggregated and uploaded as a CSV file."""
s3_aggregated_container_csv_column_order.label:
"""CSV column order"""
s3_aggregated_container_csv_column_order.desc:
"""Event fields that will be ordered first as columns in the resulting CSV file.<br/>
Regardless of this setting, resulting CSV will contain all the fields of aggregated events, but all the columns not explicitly mentioned here will be ordered after the ones listed here in the lexicographical order."""
s3_aggregated_upload_key.label:
"""S3 object key template"""
s3_aggregated_upload_key.desc:
"""Template for the S3 object key of an aggregated upload.<br/>
Template may contain placeholders for the following variables:
<ul>
<li><code>${action}</code>: name of the action (required).</li>
<li><code>${node}</code>: name of the EMQX node conducting the upload (required).</li>
<li><code>${datetime.{format}}</code>: date and time when aggregation started, formatted according to the <code>{format}</code> string (required):
<ul>
<li><code>${datetime.rfc3339utc}</code>: RFC3339-formatted date and time in UTC,</li>
<li><code>${datetime.rfc3339}</code>: RFC3339-formatted date and time in local timezone,</li>
<li><code>${datetime.unix}</code>: Unix timestamp.</li>
</ul>
</li>
<li><code>${datetime_until.{format}}</code>: date and time when aggregation ended, with the same formatting options.</li>
<li><code>${sequence}</code>: sequence number of the aggregated upload within the same time interval (required).</li>
</ul>
All other placeholders are considered invalid. Note that placeholders marked as required will be added as a path suffix to the S3 object key if they are missing from the template."""
}

View File

@ -360,6 +360,12 @@ event_username.desc:
event_username.label:
"""Username"""
event_validation.desc:
"""Validation"""
event_validation.label:
"""Validation"""
root_rule_info.desc:
"""Schema for rule info"""

View File

@ -1250,7 +1250,7 @@ base_listener_zone.desc: """~
- `force_shutdown`
- `force_gc`
- `flapping_detect`
- `session_persistence`"""
- `durable_sessions`"""
base_listener_zone.label: "Zone"
@ -1587,10 +1587,10 @@ resource_tags.label:
resource_tags.desc:
"""Tags to annotate this config entry."""
session_persistence_enable.label:
durable_sessions_enable.label:
"""Enable session persistence"""
session_persistence_enable.desc:
durable_sessions_enable.desc:
"""Use durable storage for client sessions persistence.
If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime.

View File

@ -23,6 +23,27 @@ parse_semver() {
echo "$1" | tr '.|-' ' '
}
is_allowed_non_strict() {
local src_file="$1"
local from="$2"
local to="$3"
case "$(basename "${src_file}" '.app.src')" in
emqx_auth_http)
case "${from}-${to}" in
'0.1.4-0.2.1')
return 0
;;
*)
return 1
;;
esac
;;
*)
return 1
;;
esac
}
APPS="$(./scripts/find-apps.sh)"
for app in ${APPS}; do
if [ "$app" != "emqx" ]; then
@ -70,8 +91,10 @@ for app in ${APPS}; do
[ "${now_app_version_semver[2]}" = "0" ]; then
true
else
echo "$src_file: non-strict semver version bump from $old_app_version to $now_app_version"
bad_app_count=$(( bad_app_count + 1))
if ! is_allowed_non_strict "$src_file" "$old_app_version" "$now_app_version"; then
echo "$src_file: non-strict semver version bump from $old_app_version to $now_app_version"
bad_app_count=$(( bad_app_count + 1))
fi
fi
fi
done