diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 9997055dc..1a1bac140 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -27,6 +27,7 @@ {emqx_prometheus,1}. {emqx_resource,1}. {emqx_retainer,1}. +{emqx_retainer,2}. {emqx_rule_engine,1}. {emqx_shared_sub,1}. {emqx_slow_subs,1}. diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index d13fda30a..d91b32da0 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -27,7 +27,7 @@ {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.6"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.7"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.31.2"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, diff --git a/apps/emqx/src/emqx_access_control.erl b/apps/emqx/src/emqx_access_control.erl index 30d56f257..3fa781e6d 100644 --- a/apps/emqx/src/emqx_access_control.erl +++ b/apps/emqx/src/emqx_access_control.erl @@ -46,16 +46,32 @@ authenticate(Credential) -> NotSuperUser = #{is_superuser => false}, case emqx_authentication:pre_hook_authenticate(Credential) of ok -> + inc_authn_metrics(anonymous), {ok, NotSuperUser}; continue -> - case run_hooks('client.authenticate', [Credential], {ok, #{is_superuser => false}}) of - ok -> + case run_hooks('client.authenticate', [Credential], ignore) of + ignore -> + inc_authn_metrics(anonymous), {ok, NotSuperUser}; + ok -> + inc_authn_metrics(ok), + {ok, NotSuperUser}; + {ok, _AuthResult} = OkResult -> + inc_authn_metrics(ok), + OkResult; + {ok, _AuthResult, _AuthData} = OkResult -> + inc_authn_metrics(ok), + OkResult; + {error, _Reason} = Error -> + inc_authn_metrics(error), + Error; + %% {continue, AuthCache} | {continue, AuthData, AuthCache} Other -> Other end; - Other -> - Other + {error, _Reason} = Error -> + inc_authn_metrics(error), + Error end. %% @doc Check Authorization @@ -134,3 +150,11 @@ inc_authz_metrics(deny) -> emqx_metrics:inc('authorization.deny'); inc_authz_metrics(cache_hit) -> emqx_metrics:inc('authorization.cache_hit'). + +inc_authn_metrics(error) -> + emqx_metrics:inc('authentication.failure'); +inc_authn_metrics(ok) -> + emqx_metrics:inc('authentication.success'); +inc_authn_metrics(anonymous) -> + emqx_metrics:inc('authentication.success.anonymous'), + emqx_metrics:inc('authentication.success'). diff --git a/apps/emqx/src/emqx_authentication.erl b/apps/emqx/src/emqx_authentication.erl index 749f5bfd7..ffce81787 100644 --- a/apps/emqx/src/emqx_authentication.erl +++ b/apps/emqx/src/emqx_authentication.erl @@ -228,7 +228,6 @@ when -spec pre_hook_authenticate(emqx_types:clientinfo()) -> ok | continue | {error, not_authorized}. pre_hook_authenticate(#{enable_authn := false}) -> - inc_authenticate_metric('authentication.success.anonymous'), ?TRACE_RESULT("authentication_result", ok, enable_authn_false); pre_hook_authenticate(#{enable_authn := quick_deny_anonymous} = Credential) -> case maps:get(username, Credential, undefined) of @@ -242,29 +241,18 @@ pre_hook_authenticate(#{enable_authn := quick_deny_anonymous} = Credential) -> pre_hook_authenticate(_) -> continue. -authenticate(#{listener := Listener, protocol := Protocol} = Credential, _AuthResult) -> +authenticate(#{listener := Listener, protocol := Protocol} = Credential, AuthResult) -> case get_authenticators(Listener, global_chain(Protocol)) of {ok, ChainName, Authenticators} -> case get_enabled(Authenticators) of [] -> - inc_authenticate_metric('authentication.success.anonymous'), - ?TRACE_RESULT("authentication_result", ignore, empty_chain); + ?TRACE_RESULT("authentication_result", AuthResult, empty_chain); NAuthenticators -> Result = do_authenticate(ChainName, NAuthenticators, Credential), - - case Result of - {stop, {ok, _}} -> - inc_authenticate_metric('authentication.success'); - {stop, {error, _}} -> - inc_authenticate_metric('authentication.failure'); - _ -> - ok - end, ?TRACE_RESULT("authentication_result", Result, chain_result) end; none -> - inc_authenticate_metric('authentication.success.anonymous'), - ?TRACE_RESULT("authentication_result", ignore, no_chain) + ?TRACE_RESULT("authentication_result", AuthResult, no_chain) end. get_authenticators(Listener, Global) -> @@ -649,7 +637,7 @@ handle_create_authenticator(Chain, Config, Providers) -> end. do_authenticate(_ChainName, [], _) -> - {stop, {error, not_authorized}}; + {ok, {error, not_authorized}}; do_authenticate( ChainName, [#authenticator{id = ID} = Authenticator | More], Credential ) -> @@ -673,7 +661,7 @@ do_authenticate( _ -> ok end, - {stop, Result} + {ok, Result} catch Class:Reason:Stacktrace -> ?TRACE_AUTHN(warning, "authenticator_error", #{ @@ -947,9 +935,3 @@ to_list(M) when is_map(M) -> [M]; to_list(L) when is_list(L) -> L. call(Call) -> gen_server:call(?MODULE, Call, infinity). - -inc_authenticate_metric('authentication.success.anonymous' = Metric) -> - emqx_metrics:inc(Metric), - emqx_metrics:inc('authentication.success'); -inc_authenticate_metric(Metric) -> - emqx_metrics:inc(Metric). diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index a0f2b1e7d..4a6ea2046 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -199,6 +199,7 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> Reason =:= listener_disabled; Reason =:= quic_app_missing -> + ?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}), console_print( "Listener ~ts is NOT started due to: ~p.~n", [listener_id(Type, ListenerName), Reason] @@ -212,8 +213,12 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> ), ok; {error, {already_started, Pid}} -> + ?tp(listener_not_started, #{ + type => Type, bind => Bind, status => {already_started, Pid} + }), {error, {already_started, Pid}}; {error, Reason} -> + ?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}), ListenerId = listener_id(Type, ListenerName), BindStr = format_bind(Bind), ?ELOG( diff --git a/apps/emqx/test/emqx_authentication_SUITE.erl b/apps/emqx/test/emqx_authentication_SUITE.erl index 61b4b2775..7016a8a00 100644 --- a/apps/emqx/test/emqx_authentication_SUITE.erl +++ b/apps/emqx/test/emqx_authentication_SUITE.erl @@ -22,6 +22,8 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx/include/emqx_hooks.hrl"). + -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("typerefl/include/types.hrl"). @@ -35,6 +37,20 @@ end)() ). -define(CONF_ROOT, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM). +-define(NOT_SUPERUSER, #{is_superuser => false}). + +-define(assertAuthSuccessForUser(User), + ?assertMatch( + {ok, _}, + emqx_access_control:authenticate(ClientInfo#{username => atom_to_binary(User)}) + ) +). +-define(assertAuthFailureForUser(User), + ?assertMatch( + {error, _}, + emqx_access_control:authenticate(ClientInfo#{username => atom_to_binary(User)}) + ) +). %%------------------------------------------------------------------------------ %% Hocon Schema @@ -88,9 +104,22 @@ update(_Config, _State) -> authenticate(#{username := <<"good">>}, _State) -> {ok, #{is_superuser => true}}; +authenticate(#{username := <<"ignore">>}, _State) -> + ignore; authenticate(#{username := _}, _State) -> {error, bad_username_or_password}. +hook_authenticate(#{username := <<"hook_user_good">>}, _AuthResult) -> + {ok, {ok, ?NOT_SUPERUSER}}; +hook_authenticate(#{username := <<"hook_user_bad">>}, _AuthResult) -> + {ok, {error, invalid_username}}; +hook_authenticate(#{username := <<"hook_user_finally_good">>}, _AuthResult) -> + {stop, {ok, ?NOT_SUPERUSER}}; +hook_authenticate(#{username := <<"hook_user_finally_bad">>}, _AuthResult) -> + {stop, {error, invalid_username}}; +hook_authenticate(_ClientId, AuthResult) -> + {ok, AuthResult}. + destroy(_State) -> ok. @@ -113,6 +142,10 @@ end_per_testcase(Case, Config) -> _ = ?MODULE:Case({'end', Config}), ok. +%%================================================================================= +%% Testcases +%%================================================================================= + t_chain({'init', Config}) -> Config; t_chain(Config) when is_list(Config) -> @@ -500,6 +533,92 @@ t_convert_certs(Config) when is_list(Config) -> clear_certs(CertsDir, #{<<"ssl">> => NCerts3}), ?assertEqual(false, filelib:is_regular(maps:get(<<"keyfile">>, NCerts3))). +t_combine_authn_and_callback({init, Config}) -> + [ + {listener_id, 'tcp:default'}, + {authn_type, {password_based, built_in_database}} + | Config + ]; +t_combine_authn_and_callback(Config) when is_list(Config) -> + ListenerID = ?config(listener_id), + ClientInfo = #{ + zone => default, + listener => ListenerID, + protocol => mqtt, + password => <<"any">> + }, + + %% no emqx_authentication authenticators, anonymous is allowed + ?assertAuthSuccessForUser(bad), + + AuthNType = ?config(authn_type), + register_provider(AuthNType, ?MODULE), + + AuthenticatorConfig = #{ + mechanism => password_based, + backend => built_in_database, + enable => true + }, + {ok, _} = ?AUTHN:create_authenticator(ListenerID, AuthenticatorConfig), + + %% emqx_authentication alone + ?assertAuthSuccessForUser(good), + ?assertAuthFailureForUser(ignore), + ?assertAuthFailureForUser(bad), + + %% add hook with higher priority + ok = hook(?HP_AUTHN + 1), + + %% for hook unrelataed users everything is the same + ?assertAuthSuccessForUser(good), + ?assertAuthFailureForUser(ignore), + ?assertAuthFailureForUser(bad), + + %% higher-priority hook can permit access with {ok,...}, + %% then emqx_authentication overrides the result + ?assertAuthFailureForUser(hook_user_good), + ?assertAuthFailureForUser(hook_user_bad), + + %% higher-priority hook can permit and return {stop,...}, + %% then emqx_authentication cannot override the result + ?assertAuthSuccessForUser(hook_user_finally_good), + ?assertAuthFailureForUser(hook_user_finally_bad), + + ok = unhook(), + + %% add hook with lower priority + ok = hook(?HP_AUTHN - 1), + + %% for hook unrelataed users + ?assertAuthSuccessForUser(good), + ?assertAuthFailureForUser(bad), + ?assertAuthFailureForUser(ignore), + + %% lower-priority hook can overrride auth result, + %% because emqx_authentication permits/denies with {ok, ...} + ?assertAuthSuccessForUser(hook_user_good), + ?assertAuthFailureForUser(hook_user_bad), + ?assertAuthSuccessForUser(hook_user_finally_good), + ?assertAuthFailureForUser(hook_user_finally_bad), + + ok = unhook(); +t_combine_authn_and_callback({'end', Config}) -> + ?AUTHN:delete_chain(?config(listener_id)), + ?AUTHN:deregister_provider(?config(authn_type)), + ok. + +%%================================================================================= +%% Helpers fns +%%================================================================================= + +hook(Priority) -> + ok = emqx_hooks:put( + 'client.authenticate', {?MODULE, hook_authenticate, []}, Priority + ). + +unhook() -> + ok = emqx_hooks:del('client.authenticate', {?MODULE, hook_authenticate}). + update_config(Path, ConfigRequest) -> emqx:update_config(Path, ConfigRequest, #{rawconf_with_defaults => true}). diff --git a/apps/emqx/test/emqx_trace_SUITE.erl b/apps/emqx/test/emqx_trace_SUITE.erl index 0c55687d0..38459c16a 100644 --- a/apps/emqx/test/emqx_trace_SUITE.erl +++ b/apps/emqx/test/emqx_trace_SUITE.erl @@ -43,6 +43,9 @@ init_per_suite(Config) -> timer:seconds(100) ), fun(Trace) -> + ct:pal("listener start statuses: ~p", [ + ?of_kind([listener_started, listener_not_started], Trace) + ]), %% more than one listener ?assertMatch([_ | _], ?of_kind(listener_started, Trace)) end diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 844277ba6..f61468d9b 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.7"}, + {vsn, "5.0.8"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx]}, diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index c236b9c28..d147877e8 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -38,11 +38,9 @@ %% Internal exports (RPC) -export([ - do_store_retained/1, - do_clear_expired/0, - do_delete_message/1, do_populate_index_meta/1, - do_reindex_batch/2 + do_reindex_batch/2, + active_indices/0 ]). %% Management API: @@ -66,6 +64,8 @@ -define(CLEAR_BATCH_SIZE, 1000). -define(REINDEX_BATCH_SIZE, 1000). -define(REINDEX_DISPATCH_WAIT, 30000). +-define(REINDEX_RPC_RETRY_INTERVAL, 1000). +-define(REINDEX_INDEX_UPDATE_WAIT, 30000). %%-------------------------------------------------------------------- %% Management API @@ -136,64 +136,41 @@ create_table(Table, RecordName, Attributes, Type, StorageType) -> end. store_retained(_, Msg = #message{topic = Topic}) -> - case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_store_retained/1, [Msg]) of - {atomic, ok} -> - ?tp(debug, message_retained, #{topic => Topic}), - ok; - {aborted, Reason} -> + ExpiryTime = emqx_retainer:get_expiry_time(Msg), + Tokens = topic_to_tokens(Topic), + case is_table_full() andalso is_new_topic(Tokens) of + true -> ?SLOG(error, #{ msg => "failed_to_retain_message", topic => Topic, - reason => Reason - }) - end. - -do_store_retained(#message{topic = Topic} = Msg) -> - ExpiryTime = emqx_retainer:get_expiry_time(Msg), - Tokens = topic_to_tokens(Topic), - case is_table_full() of + reason => table_is_full + }); false -> - store_retained(db_indices(write), Msg, Tokens, ExpiryTime); - _ -> - case mnesia:read(?TAB_MESSAGE, Tokens, write) of - [_] -> - store_retained(db_indices(write), Msg, Tokens, ExpiryTime); - [] -> - mnesia:abort(table_is_full) - end + do_store_retained(Msg, Tokens, ExpiryTime) end. clear_expired(_) -> - {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0), - ok. - -do_clear_expired() -> NowMs = erlang:system_time(millisecond), QH = qlc:q([ - TopicTokens + RetainedMsg || #retained_message{ - topic = TopicTokens, expiry_time = ExpiryTime - } <- mnesia:table(?TAB_MESSAGE, [{lock, write}]), + } = RetainedMsg <- ets:table(?TAB_MESSAGE), (ExpiryTime =/= 0) and (ExpiryTime < NowMs) ]), QC = qlc:cursor(QH), - clear_batch(db_indices(write), QC). + clear_batch(dirty_indices(write), QC). delete_message(_, Topic) -> - {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_delete_message/1, [Topic]), - ok. - -do_delete_message(Topic) -> Tokens = topic_to_tokens(Topic), case emqx_topic:wildcard(Topic) of false -> - ok = delete_message_by_topic(Tokens, db_indices(write)); + ok = delete_message_by_topic(Tokens, dirty_indices(write)); true -> - QH = topic_search_table(Tokens), + QH = search_table(Tokens, 0), qlc:fold( - fun(TopicTokens, _) -> - ok = delete_message_by_topic(TopicTokens, db_indices(write)) + fun(RetainedMsg, _) -> + ok = delete_message_with_indices(RetainedMsg, dirty_indices(write)) end, undefined, QH @@ -206,7 +183,7 @@ read_message(_, Topic) -> match_messages(_, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), - QH = search_table(Tokens, Now), + QH = msg_table(search_table(Tokens, Now)), case batch_read_number() of all_remaining -> {ok, qlc:eval(QH), undefined}; @@ -227,10 +204,10 @@ page_read(_, Topic, Page, Limit) -> QH = case Topic of undefined -> - search_table(undefined, ['#'], Now); + msg_table(search_table(undefined, ['#'], Now)); _ -> Tokens = topic_to_tokens(Topic), - search_table(Tokens, Now) + msg_table(search_table(Tokens, Now)) end, OrderedQH = qlc:sort(QH, {order, fun compare_message/2}), Cursor = qlc:cursor(OrderedQH), @@ -281,49 +258,49 @@ reindex_status() -> %% Internal functions %%-------------------------------------------------------------------- -store_retained(Indices, Msg, Tokens, ExpiryTime) -> - ok = store_retained_message(Msg, Tokens, ExpiryTime), - ok = emqx_retainer_index:foreach_index_key( - fun(Key) -> store_retained_index(Key, ExpiryTime) end, - Indices, - Tokens - ). +do_store_retained(Msg, TopicTokens, ExpiryTime) -> + %% Retained message is stored syncronously on all core nodes + ok = do_store_retained_message(Msg, TopicTokens, ExpiryTime), + %% Since retained message was stored syncronously on all core nodes, + %% now we are sure that + %% * either we will write correct indices + %% * or if we a replicant with outdated write indices due to reindexing, + %% the correct indices will be added by reindexing + ok = do_store_retained_indices(TopicTokens, ExpiryTime). -store_retained_message(Msg, Tokens, ExpiryTime) -> +do_store_retained_message(Msg, TopicTokens, ExpiryTime) -> RetainedMessage = #retained_message{ - topic = Tokens, + topic = TopicTokens, msg = Msg, expiry_time = ExpiryTime }, - mnesia:write(?TAB_MESSAGE, RetainedMessage, write). + ok = mria:dirty_write_sync(?TAB_MESSAGE, RetainedMessage). -store_retained_index(Key, ExpiryTime) -> +do_store_retained_indices(TopicTokens, ExpiryTime) -> + Indices = dirty_indices(write), + ok = emqx_retainer_index:foreach_index_key( + fun(Key) -> do_store_retained_index(Key, ExpiryTime) end, + Indices, + TopicTokens + ). + +do_store_retained_index(Key, ExpiryTime) -> RetainedIndex = #retained_index{ key = Key, expiry_time = ExpiryTime }, - mnesia:write(?TAB_INDEX, RetainedIndex, write). + mria:dirty_write(?TAB_INDEX, RetainedIndex). -topic_search_table(Tokens) -> - Index = emqx_retainer_index:select_index(Tokens, db_indices(read)), - topic_search_table(Index, Tokens). - -topic_search_table(undefined, Tokens) -> - Cond = emqx_retainer_index:condition(Tokens), - Ms = [{#retained_message{topic = Cond, msg = '_', expiry_time = '_'}, [], ['$_']}], - MsgQH = mnesia:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]), - qlc:q([Topic || #retained_message{topic = Topic} <- MsgQH]); -topic_search_table(Index, Tokens) -> - Cond = emqx_retainer_index:condition(Index, Tokens), - Ms = [{#retained_index{key = Cond, expiry_time = '_'}, [], ['$_']}], - IndexQH = mnesia:table(?TAB_INDEX, [{traverse, {select, Ms}}]), +msg_table(SearchTable) -> qlc:q([ - emqx_retainer_index:restore_topic(Key) - || #retained_index{key = Key} <- IndexQH + Msg + || #retained_message{ + msg = Msg + } <- SearchTable ]). search_table(Tokens, Now) -> - Indices = dirty_read_indices(), + Indices = dirty_indices(read), Index = emqx_retainer_index:select_index(Tokens, Indices), search_table(Index, Tokens, Now). @@ -341,26 +318,21 @@ search_table(Index, Tokens, Now) -> || TopicTokens <- Topics ]), qlc:q([ - Msg + RetainedMsg || [ #retained_message{ - msg = Msg, expiry_time = ExpiryTime - } + } = RetainedMsg ] <- RetainedMsgQH, (ExpiryTime == 0) or (ExpiryTime > Now) ]). -dirty_read_indices() -> - case ets:lookup(?TAB_INDEX_META, ?META_KEY) of - [#retained_index_meta{read_indices = ReadIndices}] -> ReadIndices; - [] -> [] - end. - clear_batch(Indices, QC) -> {Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE), lists:foreach( - fun(TopicTokens) -> delete_message_by_topic(TopicTokens, Indices) end, + fun(RetainedMsg) -> + delete_message_with_indices(RetainedMsg, Indices) + end, Rows ), case Result of @@ -369,14 +341,23 @@ clear_batch(Indices, QC) -> end. delete_message_by_topic(TopicTokens, Indices) -> + case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of + [] -> ok; + [RetainedMsg] -> delete_message_with_indices(RetainedMsg, Indices) + end. + +delete_message_with_indices(RetainedMsg, Indices) -> + #retained_message{topic = TopicTokens, expiry_time = ExpiryTime} = RetainedMsg, ok = emqx_retainer_index:foreach_index_key( fun(Key) -> - mnesia:delete({?TAB_INDEX, Key}) + mria:dirty_delete_object(?TAB_INDEX, #retained_index{ + key = Key, expiry_time = ExpiryTime + }) end, Indices, TopicTokens ), - ok = mnesia:delete({?TAB_MESSAGE, TopicTokens}). + ok = mria:dirty_delete_object(?TAB_MESSAGE, RetainedMsg). compare_message(M1, M2) -> M1#message.timestamp =< M2#message.timestamp. @@ -415,20 +396,26 @@ qlc_next_answers(QC, N) -> make_message_match_spec(Tokens, NowMs) -> Cond = emqx_retainer_index:condition(Tokens), - MsHd = #retained_message{topic = Cond, msg = '$2', expiry_time = '$3'}, - [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$2']}]. + MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'}, + [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}]. make_index_match_spec(Index, Tokens, NowMs) -> Cond = emqx_retainer_index:condition(Index, Tokens), MsHd = #retained_index{key = Cond, expiry_time = '$3'}, [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}]. --spec is_table_full() -> boolean(). is_table_full() -> Limit = emqx:get_config([retainer, backend, max_retained_messages]), Limit > 0 andalso (table_size() >= Limit). --spec table_size() -> non_neg_integer(). +is_new_topic(Tokens) -> + case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of + [_] -> + false; + [] -> + true + end. + table_size() -> mnesia:table_info(?TAB_MESSAGE, size). @@ -486,8 +473,14 @@ do_populate_index_meta(ConfigIndices) -> ) end. +dirty_indices(Type) -> + indices(ets:lookup(?TAB_INDEX_META, ?META_KEY), Type). + db_indices(Type) -> - case mnesia:read(?TAB_INDEX_META, ?META_KEY) of + indices(mnesia:read(?TAB_INDEX_META, ?META_KEY), Type). + +indices(IndexRecords, Type) -> + case IndexRecords of [#retained_index_meta{read_indices = ReadIndices, write_indices = WriteIndices}] -> case Type of read -> ReadIndices; @@ -506,10 +499,15 @@ batch_read_number() -> reindex(NewIndices, Force, StatusFun) when is_boolean(Force) andalso is_function(StatusFun, 1) -> + %% Do not run on replicants + core = mria_rlog:role(), %% Disable read indices and update write indices so that new records are written %% with correct indices. Also block parallel reindexing. case try_start_reindex(NewIndices, Force) of {atomic, ok} -> + %% Wait for all nodes to have new indices, including rlog nodes + true = wait_indices_updated({[], NewIndices}, ?REINDEX_INDEX_UPDATE_WAIT), + %% Wait for all dispatch operations to be completed to avoid %% inconsistent results. true = wait_dispatch_complete(?REINDEX_DISPATCH_WAIT), @@ -592,7 +590,7 @@ reindex_topic(Indices, Topic) -> case mnesia:read(?TAB_MESSAGE, Topic, read) of [#retained_message{expiry_time = ExpiryTime}] -> ok = emqx_retainer_index:foreach_index_key( - fun(Key) -> store_retained_index(Key, ExpiryTime) end, + fun(Key) -> do_store_retained_index(Key, ExpiryTime) end, Indices, Topic ); @@ -627,8 +625,35 @@ do_reindex_batch(QC, Done) -> wait_dispatch_complete(Timeout) -> Nodes = mria_mnesia:running_nodes(), - {Results, []} = emqx_retainer_proto_v1:wait_dispatch_complete(Nodes, Timeout), + {Results, []} = emqx_retainer_proto_v2:wait_dispatch_complete(Nodes, Timeout), lists:all( fun(Result) -> Result =:= ok end, Results ). + +wait_indices_updated(_Indices, TimeLeft) when TimeLeft < 0 -> false; +wait_indices_updated(Indices, TimeLeft) -> + case timer:tc(fun() -> are_indices_updated(Indices) end) of + {_, true} -> + true; + {TimePassed, false} -> + timer:sleep(?REINDEX_RPC_RETRY_INTERVAL), + wait_indices_updated( + Indices, TimeLeft - ?REINDEX_RPC_RETRY_INTERVAL - TimePassed / 1000 + ) + end. + +active_indices() -> + {dirty_indices(read), dirty_indices(write)}. + +are_indices_updated(Indices) -> + Nodes = mria_mnesia:running_nodes(), + case emqx_retainer_proto_v2:active_mnesia_indices(Nodes) of + {Results, []} -> + lists:all( + fun(NodeIndices) -> NodeIndices =:= Indices end, + Results + ); + _ -> + false + end. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl index a576b953d..402c8003f 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl @@ -50,11 +50,39 @@ retainer(["reindex", "status"]) -> retainer(["reindex", "start"]) -> retainer(["reindex", "start", "false"]); retainer(["reindex", "start", ForceParam]) -> - Force = - case ForceParam of - "true" -> true; - _ -> false - end, + case mria_rlog:role() of + core -> + Force = + case ForceParam of + "true" -> true; + _ -> false + end, + do_reindex(Force); + replicant -> + ?PRINT_MSG("Can't run reindex on a replicant node") + end; +retainer(_) -> + emqx_ctl:usage( + [ + {"retainer info", "Show the count of retained messages"}, + {"retainer topics", "Show all topics of retained messages"}, + {"retainer clean", "Clean all retained messages"}, + {"retainer clean ", "Clean retained messages by the specified topic filter"}, + {"retainer reindex status", "Show reindex status"}, + {"retainer reindex start [force]", + "Generate new retainer topic indices from config settings.\n" + "Pass true as to ignore previously started reindexing"} + ] + ). + +unload() -> + ok = emqx_ctl:unregister_command(retainer). + +%%------------------------------------------------------------------------------ +%% Private +%%------------------------------------------------------------------------------ + +do_reindex(Force) -> ?PRINT_MSG("Starting reindexing~n"), emqx_retainer_mnesia:reindex( Force, @@ -69,20 +97,4 @@ retainer(["reindex", "start", ForceParam]) -> ?PRINT("Reindexed ~p messages~n", [Done]) end ), - ?PRINT_MSG("Reindexing finished~n"); -retainer(_) -> - emqx_ctl:usage( - [ - {"retainer info", "Show the count of retained messages"}, - {"retainer topics", "Show all topics of retained messages"}, - {"retainer clean", "Clean all retained messages"}, - {"retainer clean ", "Clean retained messages by the specified topic filter"}, - {"retainer reindex status", "Show reindex status"}, - {"retainer reindex start [force]", - "Generate new retainer topic indices config settings.\n" - "Pass true as to ignore previously started reindexing"} - ] - ). - -unload() -> - ok = emqx_ctl:unregister_command(retainer). + ?PRINT_MSG("Reindexing finished~n"). diff --git a/apps/emqx_retainer/src/proto/emqx_retainer_proto_v2.erl b/apps/emqx_retainer/src/proto/emqx_retainer_proto_v2.erl new file mode 100644 index 000000000..4b98f945f --- /dev/null +++ b/apps/emqx_retainer/src/proto/emqx_retainer_proto_v2.erl @@ -0,0 +1,41 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_retainer_proto_v2). + +-behaviour(emqx_bpapi). + +-include_lib("emqx/include/bpapi.hrl"). + +-export([ + introduced_in/0, + wait_dispatch_complete/2, + active_mnesia_indices/1 +]). + +-define(TIMEOUT, 5000). + +introduced_in() -> + "5.0.13". + +-spec wait_dispatch_complete(list(node()), timeout()) -> emqx_rpc:multicall_result(ok). +wait_dispatch_complete(Nodes, Timeout) -> + rpc:multicall(Nodes, emqx_retainer_dispatcher, wait_dispatch_complete, [Timeout]). + +-spec active_mnesia_indices(list(node())) -> + emqx_rpc:multicall_result({list(emqx_retainer_index:index()), list(emqx_retainer_index:index())}). +active_mnesia_indices(Nodes) -> + rpc:multicall(Nodes, emqx_retainer_mnesia, active_indices, [], ?TIMEOUT). diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index f3e46aed9..e6f4a404e 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -318,6 +318,25 @@ t_message_expiry_2(_) -> end, with_conf(ConfMod, Case). +t_table_full(_) -> + ConfMod = fun(Conf) -> + Conf#{<<"backend">> => #{<<"max_retained_messages">> => <<"1">>}} + end, + Case = fun() -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + emqtt:publish(C1, <<"retained/t/1">>, <<"a">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/t/2">>, <<"b">>, [{qos, 0}, {retain, true}]), + + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/t/1">>, [{qos, 0}, {rh, 0}]), + ?assertEqual(1, length(receive_messages(1))), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/t/2">>, [{qos, 0}, {rh, 0}]), + ?assertEqual(0, length(receive_messages(1))), + + ok = emqtt:disconnect(C1) + end, + with_conf(ConfMod, Case). + t_clean(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), diff --git a/changes/v5.0.12-en.md b/changes/v5.0.12-en.md index 948232e1f..1a238c1af 100644 --- a/changes/v5.0.12-en.md +++ b/changes/v5.0.12-en.md @@ -24,6 +24,8 @@ Please note, the request body of `/bridges` API to configure MQTT brdige is chan - Add more PSK ciphers support [#9505](https://github.com/emqx/emqx/pull/9505). +- Improve `emqx_retainer` write performance: get rid of transactions on write [#9372](https://github.com/emqx/emqx/pull/9372). + - Upgrade dashboard to [v1.1.3](https://github.com/emqx/emqx-dashboard-web-new/releases/tag/v1.1.3). ## Bug fixes @@ -35,3 +37,12 @@ Please note, the request body of `/bridges` API to configure MQTT brdige is chan - Return `404` for `/telemetry/data` in case it's disabled [#9464](https://github.com/emqx/emqx/pull/9464). - Fix some potential MQTT packet parse errors [#9477](https://github.com/emqx/emqx/pull/9477). + +- Fixed EMQX Helm Chart deployment error [#9509](https://github.com/emqx/emqx/pull/9509) + + - Fixed the `Discovery error: no such service` error occurred during helm chart deployment, resulting in an abnormal discovery of cluster nodes. + + - Fixed that caused EMQX Helm Chart to fail when modifying some of EMQX's configuration items via environment variables + +- Fix shadowing `'client.authenticate'` callbacks by `emqx_authenticator`. Now `emqx_authenticator` + passes execution to the further callbacks if none of the authenticators matches [#9496](https://github.com/emqx/emqx/pull/9496). diff --git a/changes/v5.0.12-en.md.orig b/changes/v5.0.12-en.md.orig deleted file mode 100644 index 799c1350a..000000000 --- a/changes/v5.0.12-en.md.orig +++ /dev/null @@ -1,29 +0,0 @@ -# v5.0.12 - -## Enhancements - -- Disable global garbage collection by `node.global_gc_interval = disabled` [#9418](https://github.com/emqx/emqx/pull/9418)。 - -- Improve the CLI to avoid waste atom table when typing erros [#9416](https://github.com/emqx/emqx/pull/9416). - -- Start building MacOS packages for Apple Silicon hadrdware [#9423](https://github.com/emqx/emqx/pull/9423). - -- Remove support for setting shared subscriptions using the non-standard `$queue` feature [#9412](https://github.com/emqx/emqx/pull/9412). - Shared subscriptions are now part of the MQTT spec. Use `$share` instead. - -- Refactor authn API by replacing `POST /authentication/{id}/move` with `PUT /authentication/{id}/position/{position}`. [#9419](https://github.com/emqx/emqx/pull/9419). - Same is done for `/listeners/{listener_id}/authentication/id/...`. - -- Return `204` instead of `200` for `PUT /authenticator/:id` [#9434](https://github.com/emqx/emqx/pull/9434/). - -## Bug fixes - -- Fix that the obsolete SSL files aren't deleted after the ExHook config update [#9432](https://github.com/emqx/emqx/pull/9432). - -- Fix doc and schema for `/trace` API [#9468](https://github.com/emqx/emqx/pull/9468). - -<<<<<<< HEAD -- Return `404` for `/telemetry/data` in case it's disabled [#9464](https://github.com/emqx/emqx/pull/9464). -======= -- Fix some potential MQTT packet parse errors [#9477](https://github.com/emqx/emqx/pull/9477). ->>>>>>> 030a07d8e (fix(frame): fix potential parse errors found by fuzzing test) diff --git a/changes/v5.0.12-zh.md b/changes/v5.0.12-zh.md index 7dcadc0b2..3f3494000 100644 --- a/changes/v5.0.12-zh.md +++ b/changes/v5.0.12-zh.md @@ -23,6 +23,8 @@ v5.0.11 或更早版本创建的配置文件,在新版本中会被自动转换 - 支持更多的 PSK 密码套件[#9505](https://github.com/emqx/emqx/pull/9505)。 +- 提高 `emqx_retainer` 写入性能:摆脱写入时的事务 [#9372](https://github.com/emqx/emqx/pull/9372)。 + - Dashboard 更新到 [v1.1.3](https://github.com/emqx/emqx-dashboard-web-new/releases/tag/v1.1.3). ## 修复 @@ -34,3 +36,11 @@ v5.0.11 或更早版本创建的配置文件,在新版本中会被自动转换 - 在遥测功能未开启时,通过 /telemetry/data 请求其数据,将会返回 404 [#9464](https://github.com/emqx/emqx/pull/9464)。 - 修复了一些 MQTT 协议包的潜在解析错误 [#9477](https://github.com/emqx/emqx/pull/9477)。 + +- 修复了 EMQX Helm Chart 部署的一些问题 [#9509](https://github.com/emqx/emqx/pull/9509) + + - 修复了 EMQX Helm Chart 部署时出现 `Discovery error: no such service` 错误,导致集群节点发现异常。 + + - 修复了 EMQX Helm Chart 通过环境变量修改部分 EMQX 的配置项时的错误 + +- 通过 `emqx_authenticator` 修复隐藏 `'client.authenticate'` 回调。 现在 `emqx_authenticator` 如果没有任何验证器匹配,则将执行传递给进一步的回调 [#9496](https://github.com/emqx/emqx/pull/9496)。 diff --git a/changes/v5.0.12-zh.md.orig b/changes/v5.0.12-zh.md.orig deleted file mode 100644 index c9da9ec7d..000000000 --- a/changes/v5.0.12-zh.md.orig +++ /dev/null @@ -1,28 +0,0 @@ -# v5.0.12 - -## 增强 - -- 通过 `node.global_gc_interval = disabled` 来禁用全局垃圾回收 [#9418](https://github.com/emqx/emqx/pull/9418)。 - -- 现在,`PUT /authenticator/:id` 将会返回 204 而不再是 200 [#9434](https://github.com/emqx/emqx/pull/9434/)。 - -- 优化命令行实现, 避免输入错误指令时, 产生不必要的原子表消耗 [#9416](https://github.com/emqx/emqx/pull/9416)。 - -- 支持在 Apple Silicon 架构下编译苹果系统的发行版本 [#9423](https://github.com/emqx/emqx/pull/9423)。 - -- 删除了老的共享订阅支持方式, 不再使用 `$queue` 前缀 [#9412](https://github.com/emqx/emqx/pull/9412)。 - 共享订阅自 MQTT v5.0 开始已成为协议标准,可以使用 `$share` 前缀代替 `$queue`。 - -- 重构认证 API,使用 `PUT /authentication/{id}/position/{position}` 代替了 `POST /authentication/{id}/move` [#9419](https://github.com/emqx/emqx/pull/9419)。 - -## 修复 - -- 修复 ExHook 更新 SSL 相关配置后,过时的 SSL 文件没有被删除的问题 [#9432](https://github.com/emqx/emqx/pull/9432)。 - -- 修复 /trace API 的返回值格式和相关文档 [#9468](https://github.com/emqx/emqx/pull/9468)。 - -<<<<<<< HEAD -- 在遥测功能未开启时,通过 /telemetry/data 请求其数据,将会返回 404 [#9464](https://github.com/emqx/emqx/pull/9464)。 -======= -- 修复了一些 MQTT 协议包的潜在解析错误 [#9477](https://github.com/emqx/emqx/pull/9477)。 ->>>>>>> 030a07d8e (fix(frame): fix potential parse errors found by fuzzing test) diff --git a/deploy/charts/emqx-enterprise/templates/configmap.yaml b/deploy/charts/emqx-enterprise/templates/configmap.yaml index e0563d02a..5086f85f6 100644 --- a/deploy/charts/emqx-enterprise/templates/configmap.yaml +++ b/deploy/charts/emqx-enterprise/templates/configmap.yaml @@ -10,10 +10,25 @@ metadata: app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/managed-by: {{ .Release.Service }} data: + EMQX_NAME: {{ .Release.Name }} + {{- if eq (.Values.emqxConfig.EMQX_CLUSTER__DISCOVERY_STRATEGY) "k8s" }} + EMQX_CLUSTER__K8S__APISERVER: "https://kubernetes.default.svc:443" + EMQX_CLUSTER__K8S__SERVICE_NAME: {{ include "emqx.fullname" . }}-headless + EMQX_CLUSTER__K8S__NAMESPACE: {{ .Release.Namespace }} + EMQX_CLUSTER__K8S__ADDRESS_TYPE: "hostname" + EMQX_CLUSTER__K8S__SUFFIX: "svc.cluster.local" + {{- else if eq (.Values.emqxConfig.EMQX_CLUSTER__DISCOVERY_STRATEGY) "dns" }} + EMQX_CLUSTER__DNS__NAME: "{{ include "emqx.fullname" . }}-headless.{{ .Release.Namespace }}.svc.cluster.local" + EMQX_CLUSTER__DNS__RECORD_TYPE: "srv" + {{- end -}} {{- range $index, $value := .Values.emqxConfig }} {{- if $value }} {{- $key := (regexReplaceAllLiteral "\\." (regexReplaceAllLiteral "EMQX[_\\.]" (upper (trimAll " " $index)) "") "__") }} + {{- if or (kindIs "map" $value) (kindIs "slice" $value) }} + {{ print "EMQX_" $key }}: {{ tpl (printf "%q" (toJson $value)) $ }} + {{- else }} {{ print "EMQX_" $key }}: "{{ tpl (printf "%v" $value) $ }}" {{- end }} {{- end }} + {{- end }} {{- end }} diff --git a/deploy/charts/emqx-enterprise/values.yaml b/deploy/charts/emqx-enterprise/values.yaml index 7827d6afb..10426514e 100644 --- a/deploy/charts/emqx-enterprise/values.yaml +++ b/deploy/charts/emqx-enterprise/values.yaml @@ -7,6 +7,8 @@ replicaCount: 3 image: repository: emqx/emqx-enterprise pullPolicy: IfNotPresent + # Overrides the image tag whose default is the chart appVersion. + tag: "" ## Optionally specify an array of imagePullSecrets. ## Secrets must be manually created in the namespace. ## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/ @@ -92,19 +94,6 @@ initContainers: {} ## EMQX configuration item, see the documentation (https://hub.docker.com/r/emqx/emqx) emqxConfig: EMQX_CLUSTER__DISCOVERY_STRATEGY: "dns" - EMQX_CLUSTER__DNS__NAME: "{{ .Release.Name }}-headless.{{ .Release.Namespace }}.svc.cluster.local" - EMQX_CLUSTER__DNS__RECORD_TYPE: "srv" - # EMQX_CLUSTER__DISCOVERY_STRATEGY: "k8s" - # EMQX_CLUSTER__K8S__APISERVER: "https://kubernetes.default.svc:443" - # EMQX_CLUSTER__K8S__SERVICE_NAME: "{{ .Release.Name }}-headless" - # EMQX_CLUSTER__K8S__NAMESPACE: "{{ .Release.Namespace }}" - ## The address type is used to extract host from k8s service. - ## Value: ip | dns | hostname - ## Note:Hostname is only supported after v4.0-rc.2 - EMQX_CLUSTER__K8S__ADDRESS_TYPE: "hostname" - EMQX_CLUSTER__K8S__SUFFIX: "svc.cluster.local" - ## if EMQX_CLUSTER__K8S__ADDRESS_TYPE eq dns - # EMQX_CLUSTER__K8S__SUFFIX: "pod.cluster.local" EMQX_DASHBOARD__DEFAULT_USERNAME: "admin" EMQX_DASHBOARD__DEFAULT_PASSWORD: "public" diff --git a/deploy/charts/emqx/templates/configmap.yaml b/deploy/charts/emqx/templates/configmap.yaml index e0563d02a..5086f85f6 100644 --- a/deploy/charts/emqx/templates/configmap.yaml +++ b/deploy/charts/emqx/templates/configmap.yaml @@ -10,10 +10,25 @@ metadata: app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/managed-by: {{ .Release.Service }} data: + EMQX_NAME: {{ .Release.Name }} + {{- if eq (.Values.emqxConfig.EMQX_CLUSTER__DISCOVERY_STRATEGY) "k8s" }} + EMQX_CLUSTER__K8S__APISERVER: "https://kubernetes.default.svc:443" + EMQX_CLUSTER__K8S__SERVICE_NAME: {{ include "emqx.fullname" . }}-headless + EMQX_CLUSTER__K8S__NAMESPACE: {{ .Release.Namespace }} + EMQX_CLUSTER__K8S__ADDRESS_TYPE: "hostname" + EMQX_CLUSTER__K8S__SUFFIX: "svc.cluster.local" + {{- else if eq (.Values.emqxConfig.EMQX_CLUSTER__DISCOVERY_STRATEGY) "dns" }} + EMQX_CLUSTER__DNS__NAME: "{{ include "emqx.fullname" . }}-headless.{{ .Release.Namespace }}.svc.cluster.local" + EMQX_CLUSTER__DNS__RECORD_TYPE: "srv" + {{- end -}} {{- range $index, $value := .Values.emqxConfig }} {{- if $value }} {{- $key := (regexReplaceAllLiteral "\\." (regexReplaceAllLiteral "EMQX[_\\.]" (upper (trimAll " " $index)) "") "__") }} + {{- if or (kindIs "map" $value) (kindIs "slice" $value) }} + {{ print "EMQX_" $key }}: {{ tpl (printf "%q" (toJson $value)) $ }} + {{- else }} {{ print "EMQX_" $key }}: "{{ tpl (printf "%v" $value) $ }}" {{- end }} {{- end }} + {{- end }} {{- end }} diff --git a/deploy/charts/emqx/values.yaml b/deploy/charts/emqx/values.yaml index b648f070f..f6ba9eda4 100644 --- a/deploy/charts/emqx/values.yaml +++ b/deploy/charts/emqx/values.yaml @@ -94,19 +94,6 @@ initContainers: {} ## EMQX configuration item, see the documentation (https://hub.docker.com/r/emqx/emqx) emqxConfig: EMQX_CLUSTER__DISCOVERY_STRATEGY: "dns" - EMQX_CLUSTER__DNS__NAME: "{{ .Release.Name }}-headless.{{ .Release.Namespace }}.svc.cluster.local" - EMQX_CLUSTER__DNS__RECORD_TYPE: "srv" - # EMQX_CLUSTER__DISCOVERY_STRATEGY: "k8s" - # EMQX_CLUSTER__K8S__APISERVER: "https://kubernetes.default.svc:443" - # EMQX_CLUSTER__K8S__SERVICE_NAME: "{{ .Release.Name }}-headless" - # EMQX_CLUSTER__K8S__NAMESPACE: "{{ .Release.Namespace }}" - ## The address type is used to extract host from k8s service. - ## Value: ip | dns | hostname - ## Note:Hostname is only supported after v4.0-rc.2 - EMQX_CLUSTER__K8S__ADDRESS_TYPE: "hostname" - EMQX_CLUSTER__K8S__SUFFIX: "svc.cluster.local" - ## if EMQX_CLUSTER__K8S__ADDRESS_TYPE eq dns - # EMQX_CLUSTER__K8S__SUFFIX: "pod.cluster.local" EMQX_DASHBOARD__DEFAULT_USERNAME: "admin" EMQX_DASHBOARD__DEFAULT_PASSWORD: "public" diff --git a/mix.exs b/mix.exs index f43ca7119..26866b700 100644 --- a/mix.exs +++ b/mix.exs @@ -52,7 +52,7 @@ defmodule EMQXUmbrella.MixProject do {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, {:esockd, github: "emqx/esockd", tag: "5.9.4", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.13.6", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.13.7", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.7", override: true}, diff --git a/rebar.config b/rebar.config index 687f49cea..fdf86eb5c 100644 --- a/rebar.config +++ b/rebar.config @@ -54,7 +54,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.6"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.7"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}