diff --git a/apps/emqx/rebar.config.script b/apps/emqx/rebar.config.script index db54b6177..174663e80 100644 --- a/apps/emqx/rebar.config.script +++ b/apps/emqx/rebar.config.script @@ -24,7 +24,7 @@ IsQuicSupp = fun() -> end, Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}, -Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.201"}}}. +Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.202"}}}. Dialyzer = fun(Config) -> {dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config), diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index cea22652d..1e4940965 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -298,9 +298,9 @@ takeover_session_begin(ClientId) -> takeover_session_begin(ClientId, ChanPid) when is_pid(ChanPid) -> case takeover_session(ClientId, ChanPid) of - {living, ConnMod, Session} -> + {living, ConnMod, ChanPid, Session} -> {ok, Session, {ConnMod, ChanPid}}; - none -> + _ -> none end; takeover_session_begin(_ClientId, undefined) -> @@ -368,13 +368,20 @@ do_takeover_begin(ClientId, ChanPid) when node(ChanPid) == node() -> ConnMod when is_atom(ConnMod) -> case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of {ok, Session} -> - {living, ConnMod, Session}; + {living, ConnMod, ChanPid, Session}; {error, Reason} -> error(Reason) end end; do_takeover_begin(ClientId, ChanPid) -> - wrap_rpc(emqx_cm_proto_v2:takeover_session(ClientId, ChanPid)). + case wrap_rpc(emqx_cm_proto_v2:takeover_session(ClientId, ChanPid)) of + %% NOTE: v5.3.0 + {living, ConnMod, Session} -> + {living, ConnMod, ChanPid, Session}; + %% NOTE: other versions + Res -> + Res + end. %% @doc Discard all the sessions identified by the ClientId. -spec discard_session(emqx_types:clientid()) -> ok. diff --git a/apps/emqx/src/proto/emqx_cm_proto_v2.erl b/apps/emqx/src/proto/emqx_cm_proto_v2.erl index 29dec50cd..6ccf596c3 100644 --- a/apps/emqx/src/proto/emqx_cm_proto_v2.erl +++ b/apps/emqx/src/proto/emqx_cm_proto_v2.erl @@ -65,8 +65,10 @@ get_chann_conn_mod(ClientId, ChanPid) -> -spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) -> none - | {expired | persistent, emqx_session:session()} | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()} + %% NOTE: v5.3.0 + | {living, _ConnMod :: atom(), emqx_session:session()} + | {expired | persistent, emqx_session:session()} | {badrpc, _}. takeover_session(ClientId, ChanPid) -> rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2). diff --git a/apps/emqx_auth/include/emqx_authz.hrl b/apps/emqx_auth/include/emqx_authz.hrl index f017b9be5..9af795a82 100644 --- a/apps/emqx_auth/include/emqx_authz.hrl +++ b/apps/emqx_auth/include/emqx_authz.hrl @@ -163,3 +163,8 @@ -define(DEFAULT_RULE_QOS, [0, 1, 2]). -define(DEFAULT_RULE_RETAIN, all). + +-define(BUILTIN_SOURCES, [ + {client_info, emqx_authz_client_info}, + {file, emqx_authz_file} +]). diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl index 5eca842f4..0d668bc20 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl @@ -26,6 +26,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(CONF_ROOT, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM). @@ -446,7 +447,7 @@ handle_continue(initialize_authentication, #{init_done := true} = State) -> {noreply, State}; handle_continue(initialize_authentication, #{providers := Providers} = State) -> InitDone = initialize_authentication(Providers), - {noreply, State#{init_done := InitDone}}. + {noreply, maybe_hook(State#{init_done := InitDone})}. handle_cast(Req, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Req}), @@ -495,6 +496,7 @@ do_initialize_authentication(Providers, Chains, _HasProviders = true) -> Chains ), ok = unhook_deny(), + ?tp(info, authn_chains_initialization_done, #{}), true. initialize_chain_authentication(_Providers, _ChainName, []) -> diff --git a/apps/emqx_auth/src/emqx_authz/emqx_authz.erl b/apps/emqx_auth/src/emqx_authz/emqx_authz.erl index 0ec300406..80fdd4f2d 100644 --- a/apps/emqx_auth/src/emqx_authz/emqx_authz.erl +++ b/apps/emqx_auth/src/emqx_authz/emqx_authz.erl @@ -88,8 +88,7 @@ init() -> emqx_conf:add_handler(?CONF_KEY_PATH, ?MODULE), emqx_conf:add_handler(?ROOT_KEY, ?MODULE), ok = emqx_hooks:put('client.authorize', {?MODULE, authorize_deny, []}, ?HP_AUTHZ), - ok = register_source(client_info, emqx_authz_client_info), - ok = register_source(file, emqx_authz_file), + ok = register_builtin_sources(), ok. register_source(Type, Module) -> @@ -124,6 +123,14 @@ are_all_providers_registered() -> false end. +register_builtin_sources() -> + lists:foreach( + fun({Type, Module}) -> + register_source(Type, Module) + end, + ?BUILTIN_SOURCES + ). + configured_types() -> lists:map( fun(#{type := Type}) -> Type end, @@ -186,8 +193,14 @@ pre_config_update(Path, Cmd, Sources) -> {error, Reason} -> {error, Reason}; NSources -> {ok, NSources} catch - Error:Reason:Stack -> + throw:Reason -> ?SLOG(info, #{ + msg => "error_in_pre_config_update", + reason => Reason + }), + {error, Reason}; + Error:Reason:Stack -> + ?SLOG(warning, #{ msg => "error_in_pre_config_update", exception => Error, reason => Reason, @@ -596,10 +609,6 @@ maybe_convert_sources( maybe_convert_sources(RawConf, _Fun) -> RawConf. -% read_acl_file(#{<<"path">> := Path} = Source) -> -% {ok, Rules} = emqx_authz_file:read_file(Path), -% maps:remove(<<"path">>, Source#{<<"rules">> => Rules}). - %%------------------------------------------------------------------------------ %% Extended Features %%------------------------------------------------------------------------------ @@ -706,7 +715,7 @@ maybe_read_source_files_safe(Source0) -> catch Error:Reason:Stacktrace -> ?SLOG(error, #{ - msg => "error_in_maybe_read_source_files", + msg => "error_when_reading_source_files", exception => Error, reason => Reason, stacktrace => Stacktrace diff --git a/apps/emqx_auth/test/emqx_authn/emqx_authn_init_SUITE.erl b/apps/emqx_auth/test/emqx_authn/emqx_authn_init_SUITE.erl index 48542ebfc..80f37be26 100644 --- a/apps/emqx_auth/test/emqx_authn/emqx_authn_init_SUITE.erl +++ b/apps/emqx_auth/test/emqx_authn/emqx_authn_init_SUITE.erl @@ -19,6 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -48,9 +49,11 @@ init_per_testcase(_Case, Config) -> work_dir => ?config(priv_dir, Config) } ), + ok = snabbkaffe:start_trace(), [{apps, Apps} | Config]. end_per_testcase(_Case, Config) -> + ok = snabbkaffe:stop(), _ = application:stop(emqx_auth), ok = emqx_cth_suite:stop(?config(apps, Config)), ok. @@ -66,10 +69,14 @@ t_initialize(_Config) -> emqx_access_control:authenticate(?CLIENTINFO) ), - ok = emqx_authn_test_lib:register_fake_providers([{password_based, built_in_database}]), + ?assertWaitEvent( + ok = emqx_authn_test_lib:register_fake_providers([{password_based, built_in_database}]), + #{?snk_kind := authn_chains_initialization_done}, + 100 + ), ?assertMatch( - {error, not_authorized}, + {error, bad_username_or_password}, emqx_access_control:authenticate(?CLIENTINFO) ), diff --git a/apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl b/apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl index 2e45c5c11..1af7d4d1d 100644 --- a/apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl +++ b/apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl @@ -20,7 +20,6 @@ -include("emqx_authz.hrl"). -include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl"). @@ -35,33 +34,18 @@ groups() -> []. init_per_suite(Config) -> - meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_resource, create_local, fun(_, _, _, _) -> {ok, meck_data} end), - meck:expect(emqx_resource, remove_local, fun(_) -> ok end), - meck:expect( - emqx_authz_file, - acl_conf_file, - fun() -> - emqx_common_test_helpers:deps_path(emqx_auth, "etc/acl.conf") - end - ), Apps = emqx_cth_suite:start( [ emqx, {emqx_conf, "authorization { cache { enable = false }, no_match = deny, sources = [] }"}, - emqx_auth, - emqx_auth_http, - emqx_auth_mnesia, - emqx_auth_redis, - emqx_auth_postgresql, - emqx_auth_mysql, - emqx_auth_mongodb + emqx_auth ], #{ work_dir => filename:join(?config(priv_dir, Config), ?MODULE) } ), + ok = emqx_authz_test_lib:register_fake_sources([http, redis, mongodb, mysql, postgresql]), [{suite_apps, Apps} | Config]. end_per_suite(Config) -> @@ -73,8 +57,8 @@ end_per_suite(Config) -> <<"sources">> => [] } ), + ok = emqx_authz_test_lib:deregister_sources(), emqx_cth_suite:stop(?config(suite_apps, Config)), - meck:unload(emqx_resource), ok. init_per_testcase(TestCase, Config) when @@ -102,7 +86,7 @@ end_per_testcase(_TestCase, _Config) -> emqx_common_test_helpers:call_janitor(), ok. --define(SOURCE1, #{ +-define(SOURCE_HTTP, #{ <<"type">> => <<"http">>, <<"enable">> => true, <<"url">> => <<"https://example.com:443/a/b?c=d">>, @@ -111,7 +95,7 @@ end_per_testcase(_TestCase, _Config) -> <<"method">> => <<"get">>, <<"request_timeout">> => <<"5s">> }). --define(SOURCE2, #{ +-define(SOURCE_MONGODB, #{ <<"type">> => <<"mongodb">>, <<"enable">> => true, <<"mongo_type">> => <<"single">>, @@ -123,7 +107,7 @@ end_per_testcase(_TestCase, _Config) -> <<"collection">> => <<"authz">>, <<"filter">> => #{<<"a">> => <<"b">>} }). --define(SOURCE3, #{ +-define(SOURCE_MYSQL, #{ <<"type">> => <<"mysql">>, <<"enable">> => true, <<"server">> => <<"127.0.0.1:27017">>, @@ -135,7 +119,7 @@ end_per_testcase(_TestCase, _Config) -> <<"ssl">> => #{<<"enable">> => false}, <<"query">> => <<"abcb">> }). --define(SOURCE4, #{ +-define(SOURCE_POSTGRESQL, #{ <<"type">> => <<"postgresql">>, <<"enable">> => true, <<"server">> => <<"127.0.0.1:27017">>, @@ -147,7 +131,7 @@ end_per_testcase(_TestCase, _Config) -> <<"ssl">> => #{<<"enable">> => false}, <<"query">> => <<"abcb">> }). --define(SOURCE5, #{ +-define(SOURCE_REDIS, #{ <<"type">> => <<"redis">>, <<"redis_type">> => <<"single">>, <<"enable">> => true, @@ -160,22 +144,22 @@ end_per_testcase(_TestCase, _Config) -> <<"cmd">> => <<"HGETALL mqtt_authz:", ?PH_USERNAME/binary>> }). --define(FILE_SOURCE(Rules), #{ +-define(SOURCE_FILE(Rules), #{ <<"type">> => <<"file">>, <<"enable">> => true, <<"rules">> => Rules }). --define(SOURCE6, - ?FILE_SOURCE( +-define(SOURCE_FILE1, + ?SOURCE_FILE( << "{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}." "\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}." >> ) ). --define(SOURCE7, - ?FILE_SOURCE( +-define(SOURCE_FILE2, + ?SOURCE_FILE( << "{allow,{username,\"some_client\"},publish,[\"some_client/lwt\"]}.\n" "{deny, all}." @@ -183,15 +167,6 @@ end_per_testcase(_TestCase, _Config) -> ) ). --define(BAD_FILE_SOURCE2, #{ - <<"type">> => <<"file">>, - <<"enable">> => true, - <<"rules">> => - << - "{not_allow,{username,\"some_client\"},publish,[\"some_client/lwt\"]}." - >> -}). - %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -199,24 +174,23 @@ end_per_testcase(_TestCase, _Config) -> -define(UPDATE_ERROR(Err), {error, {pre_config_update, emqx_authz, Err}}). t_bad_file_source(_) -> - BadContent = ?FILE_SOURCE(<<"{allow,{username,\"bar\"}, publish, [\"test\"]}">>), + BadContent = ?SOURCE_FILE(<<"{allow,{username,\"bar\"}, publish, [\"test\"]}">>), BadContentErr = {bad_acl_file_content, {1, erl_parse, ["syntax error before: ", []]}}, - BadRule = ?FILE_SOURCE(<<"{allow,{username,\"bar\"},publish}.">>), + BadRule = ?SOURCE_FILE(<<"{allow,{username,\"bar\"},publish}.">>), BadRuleErr = {invalid_authorization_rule, {allow, {username, "bar"}, publish}}, - BadPermission = ?FILE_SOURCE(<<"{not_allow,{username,\"bar\"},publish,[\"test\"]}.">>), + BadPermission = ?SOURCE_FILE(<<"{not_allow,{username,\"bar\"},publish,[\"test\"]}.">>), BadPermissionErr = {invalid_authorization_permission, not_allow}, - BadAction = ?FILE_SOURCE(<<"{allow,{username,\"bar\"},pubsub,[\"test\"]}.">>), + BadAction = ?SOURCE_FILE(<<"{allow,{username,\"bar\"},pubsub,[\"test\"]}.">>), BadActionErr = {invalid_authorization_action, pubsub}, lists:foreach( fun({Source, Error}) -> File = emqx_authz_file:acl_conf_file(), - {ok, Bin1} = file:read_file(File), + ?assertEqual({error, enoent}, file:read_file(File)), ?assertEqual(?UPDATE_ERROR(Error), emqx_authz:update(?CMD_REPLACE, [Source])), ?assertEqual(?UPDATE_ERROR(Error), emqx_authz:update(?CMD_PREPEND, Source)), ?assertEqual(?UPDATE_ERROR(Error), emqx_authz:update(?CMD_APPEND, Source)), - %% Check file content not changed if update failed - {ok, Bin2} = file:read_file(File), - ?assertEqual(Bin1, Bin2) + %% Check file is not created if update failed; + ?assertEqual({error, enoent}, file:read_file(File)) end, [ {BadContent, BadContentErr}, @@ -230,14 +204,32 @@ t_bad_file_source(_) -> emqx_conf:get([authorization, sources], []) ). +t_good_file_source(_) -> + RuleBin = <<"{allow,{username,\"bar\"}, publish, [\"test\"]}.">>, + GoodFileSource = ?SOURCE_FILE(RuleBin), + File = emqx_authz_file:acl_conf_file(), + lists:foreach( + fun({Command, Argument}) -> + _ = file:delete(File), + ?assertMatch({ok, _}, emqx_authz:update(Command, Argument)), + ?assertEqual({ok, RuleBin}, file:read_file(File)), + {ok, _} = emqx_authz:update(?CMD_REPLACE, []) + end, + [ + {?CMD_REPLACE, [GoodFileSource]}, + {?CMD_PREPEND, GoodFileSource}, + {?CMD_APPEND, GoodFileSource} + ] + ). + t_update_source(_) -> %% replace all - {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE3]), - {ok, _} = emqx_authz:update(?CMD_PREPEND, ?SOURCE2), - {ok, _} = emqx_authz:update(?CMD_PREPEND, ?SOURCE1), - {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE4), - {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE5), - {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE6), + {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_MYSQL]), + {ok, _} = emqx_authz:update(?CMD_PREPEND, ?SOURCE_MONGODB), + {ok, _} = emqx_authz:update(?CMD_PREPEND, ?SOURCE_HTTP), + {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE_POSTGRESQL), + {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE_REDIS), + {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE_FILE1), ?assertMatch( [ @@ -251,19 +243,23 @@ t_update_source(_) -> emqx_conf:get([authorization, sources], []) ), - {ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE1#{<<"enable">> := true}), - {ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE2#{<<"enable">> := true}), - {ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE3#{<<"enable">> := true}), - {ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE4#{<<"enable">> := true}), - {ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE5#{<<"enable">> := true}), - {ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE6#{<<"enable">> := true}), + {ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE_HTTP#{<<"enable">> := true}), + {ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE_MONGODB#{<<"enable">> := true}), + {ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE_MYSQL#{<<"enable">> := true}), + {ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE_POSTGRESQL#{ + <<"enable">> := true + }), + {ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE_REDIS#{<<"enable">> := true}), + {ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE_FILE1#{<<"enable">> := true}), - {ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE1#{<<"enable">> := false}), - {ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE2#{<<"enable">> := false}), - {ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE3#{<<"enable">> := false}), - {ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE4#{<<"enable">> := false}), - {ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE5#{<<"enable">> := false}), - {ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE6#{<<"enable">> := false}), + {ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE_HTTP#{<<"enable">> := false}), + {ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE_MONGODB#{<<"enable">> := false}), + {ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE_MYSQL#{<<"enable">> := false}), + {ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE_POSTGRESQL#{ + <<"enable">> := false + }), + {ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE_REDIS#{<<"enable">> := false}), + {ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE_FILE1#{<<"enable">> := false}), ?assertMatch( [ @@ -295,7 +291,12 @@ t_replace_all(_) -> Conf = emqx:get_raw_config(RootKey), emqx_authz_utils:update_config(RootKey, Conf#{ <<"sources">> => [ - ?SOURCE6, ?SOURCE5, ?SOURCE4, ?SOURCE3, ?SOURCE2, ?SOURCE1 + ?SOURCE_FILE1, + ?SOURCE_REDIS, + ?SOURCE_POSTGRESQL, + ?SOURCE_MYSQL, + ?SOURCE_MONGODB, + ?SOURCE_HTTP ] }), %% config @@ -335,7 +336,7 @@ t_replace_all(_) -> {ok, _}, emqx_authz_utils:update_config( RootKey, - Conf#{<<"sources">> => [?SOURCE1#{<<"enable">> => false}]} + Conf#{<<"sources">> => [?SOURCE_HTTP#{<<"enable">> => false}]} ) ), %% hooks status @@ -351,7 +352,7 @@ t_replace_all(_) -> ok. t_delete_source(_) -> - {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE1]), + {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_HTTP]), ?assertMatch([#{type := http, enable := true}], emqx_conf:get([authorization, sources], [])), @@ -363,12 +364,12 @@ t_move_source(_) -> {ok, _} = emqx_authz:update( ?CMD_REPLACE, [ - ?SOURCE1, - ?SOURCE2, - ?SOURCE3, - ?SOURCE4, - ?SOURCE5, - ?SOURCE6 + ?SOURCE_HTTP, + ?SOURCE_MONGODB, + ?SOURCE_MYSQL, + ?SOURCE_POSTGRESQL, + ?SOURCE_REDIS, + ?SOURCE_FILE1 ] ), ?assertMatch( @@ -437,15 +438,26 @@ t_move_source(_) -> ok. +t_pre_config_update_crash(_) -> + ok = meck:new(emqx_authz_fake_source, [non_strict, passthrough, no_history]), + ok = meck:expect(emqx_authz_fake_source, write_files, fun(_) -> meck:exception(error, oops) end), + ?assertEqual( + {error, {pre_config_update, emqx_authz, oops}}, + emqx_authz:update(?CMD_APPEND, ?SOURCE_HTTP) + ), + ok = meck:unload(emqx_authz_fake_source). + t_get_enabled_authzs_none_enabled(_Config) -> ?assertEqual([], emqx_authz:get_enabled_authzs()). t_get_enabled_authzs_some_enabled(_Config) -> - {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE4, ?SOURCE5#{<<"enable">> := false}]), + {ok, _} = emqx_authz:update(?CMD_REPLACE, [ + ?SOURCE_POSTGRESQL, ?SOURCE_REDIS#{<<"enable">> := false} + ]), ?assertEqual([postgresql], emqx_authz:get_enabled_authzs()). t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) -> - {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]), + {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_FILE2]), {ok, C} = emqtt:start_link([ {username, <<"some_client">>}, {will_topic, <<"some_client/lwt">>}, @@ -473,7 +485,7 @@ t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) -> ok. t_publish_deny_disconnect_publishes_last_will_testament(_Config) -> - {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]), + {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_FILE2]), {ok, C} = emqtt:start_link([ {username, <<"some_client">>}, {will_topic, <<"some_client/lwt">>}, @@ -530,7 +542,7 @@ t_publish_last_will_testament_denied_topic(_Config) -> %% and then gets banned and kicked out while connected. Should not %% publish LWT. t_publish_last_will_testament_banned_client_connecting(_Config) -> - {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]), + {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_FILE2]), Username = <<"some_client">>, ClientId = <<"some_clientid">>, LWTPayload = <<"should not be published">>, diff --git a/apps/emqx_auth/test/emqx_authz/emqx_authz_api_sources_SUITE.erl b/apps/emqx_auth/test/emqx_authz/emqx_authz_api_sources_SUITE.erl index 1ead754ec..e9dfa6a7b 100644 --- a/apps/emqx_auth/test/emqx_authz/emqx_authz_api_sources_SUITE.erl +++ b/apps/emqx_auth/test/emqx_authz/emqx_authz_api_sources_SUITE.erl @@ -29,7 +29,7 @@ -define(PGSQL_HOST, "pgsql"). -define(REDIS_SINGLE_HOST, "redis"). --define(SOURCE1, #{ +-define(SOURCE_REDIS1, #{ <<"type">> => <<"http">>, <<"enable">> => true, <<"url">> => <<"https://fake.com:443/acl?username=", ?PH_USERNAME/binary>>, @@ -38,7 +38,7 @@ <<"method">> => <<"get">>, <<"request_timeout">> => <<"5s">> }). --define(SOURCE2, #{ +-define(SOURCE_MONGODB, #{ <<"type">> => <<"mongodb">>, <<"enable">> => true, <<"mongo_type">> => <<"single">>, @@ -50,7 +50,7 @@ <<"collection">> => <<"fake">>, <<"filter">> => #{<<"a">> => <<"b">>} }). --define(SOURCE3, #{ +-define(SOURCE_MYSQL, #{ <<"type">> => <<"mysql">>, <<"enable">> => true, <<"server">> => <>, @@ -62,7 +62,7 @@ <<"ssl">> => #{<<"enable">> => false}, <<"query">> => <<"abcb">> }). --define(SOURCE4, #{ +-define(SOURCE_POSTGRESQL, #{ <<"type">> => <<"postgresql">>, <<"enable">> => true, <<"server">> => <>, @@ -74,7 +74,7 @@ <<"ssl">> => #{<<"enable">> => false}, <<"query">> => <<"abcb">> }). --define(SOURCE5, #{ +-define(SOURCE_REDIS2, #{ <<"type">> => <<"redis">>, <<"enable">> => true, <<"servers">> => <>, @@ -85,7 +85,7 @@ <<"ssl">> => #{<<"enable">> => false}, <<"cmd">> => <<"HGETALL mqtt_authz:", ?PH_USERNAME/binary>> }). --define(SOURCE6, #{ +-define(SOURCE_FILE, #{ <<"type">> => <<"file">>, <<"enable">> => true, <<"rules">> => @@ -120,12 +120,6 @@ init_per_suite(Config) -> {emqx_conf, "authorization { cache { enable = false }, no_match = deny, sources = [] }"}, emqx_auth, - emqx_auth_http, - emqx_auth_mnesia, - emqx_auth_redis, - emqx_auth_postgresql, - emqx_auth_mysql, - emqx_auth_mongodb, emqx_management, {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} ], @@ -133,6 +127,7 @@ init_per_suite(Config) -> work_dir => filename:join(?config(priv_dir, Config), ?MODULE) } ), + ok = emqx_authz_test_lib:register_fake_sources([http, mongodb, mysql, postgresql, redis]), _ = emqx_common_test_http:create_default_app(), [{suite_apps, Apps} | Config]. @@ -192,9 +187,11 @@ t_api(_) -> begin {ok, 204, _} = request(post, uri(["authorization", "sources"]), Source) end - || Source <- lists:reverse([?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]) + || Source <- lists:reverse([ + ?SOURCE_MONGODB, ?SOURCE_MYSQL, ?SOURCE_POSTGRESQL, ?SOURCE_REDIS2, ?SOURCE_FILE + ]) ], - {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE1), + {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE_REDIS1), {ok, 200, Result2} = request(get, uri(["authorization", "sources"]), []), Sources = get_sources(Result2), @@ -214,7 +211,7 @@ t_api(_) -> {ok, 204, _} = request( put, uri(["authorization", "sources", "http"]), - ?SOURCE1#{<<"enable">> := false} + ?SOURCE_REDIS1#{<<"enable">> := false} ), {ok, 200, Result3} = request(get, uri(["authorization", "sources", "http"]), []), ?assertMatch( @@ -238,7 +235,7 @@ t_api(_) -> {ok, 204, _} = request( put, uri(["authorization", "sources", "mongodb"]), - ?SOURCE2#{ + ?SOURCE_MONGODB#{ <<"ssl">> => #{ <<"enable">> => <<"true">>, <<"cacertfile">> => Cacertfile, @@ -279,7 +276,7 @@ t_api(_) -> {ok, 204, _} = request( put, uri(["authorization", "sources", "mongodb"]), - ?SOURCE2#{ + ?SOURCE_MONGODB#{ <<"ssl">> => #{ <<"enable">> => <<"true">>, <<"cacertfile">> => Cacert, @@ -329,19 +326,19 @@ t_api(_) -> {ok, 204, _} = request( put, uri(["authorization", "sources", "mysql"]), - ?SOURCE3#{<<"server">> := <<"192.168.1.100:3306">>} + ?SOURCE_MYSQL#{<<"server">> := <<"192.168.1.100:3306">>} ), {ok, 204, _} = request( put, uri(["authorization", "sources", "postgresql"]), - ?SOURCE4#{<<"server">> := <<"fake">>} + ?SOURCE_POSTGRESQL#{<<"server">> := <<"fake">>} ), {ok, 204, _} = request( put, uri(["authorization", "sources", "redis"]), - ?SOURCE5#{ + ?SOURCE_REDIS2#{ <<"servers">> := [ <<"192.168.1.100:6379">>, <<"192.168.1.100:6380">> @@ -413,7 +410,7 @@ t_api(_) -> #{<<"type">> => <<"built_in_database">>, <<"enable">> => false} ), - {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE6), + {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE_FILE), {ok, Client} = emqtt:start_link( [ @@ -505,7 +502,9 @@ t_api(_) -> ok. t_source_move(_) -> - {ok, _} = emqx_authz:update(replace, [?SOURCE1, ?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5]), + {ok, _} = emqx_authz:update(replace, [ + ?SOURCE_REDIS1, ?SOURCE_MONGODB, ?SOURCE_MYSQL, ?SOURCE_POSTGRESQL, ?SOURCE_REDIS2 + ]), ?assertMatch( [ #{type := http}, diff --git a/apps/emqx_auth/test/emqx_authz/emqx_authz_fake_source.erl b/apps/emqx_auth/test/emqx_authz/emqx_authz_fake_source.erl new file mode 100644 index 000000000..88a54f972 --- /dev/null +++ b/apps/emqx_auth/test/emqx_authz/emqx_authz_fake_source.erl @@ -0,0 +1,46 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authz_fake_source). + +-behaviour(emqx_authz_source). + +%% APIs +-export([ + description/0, + create/1, + update/1, + destroy/1, + authorize/4 +]). + +%%-------------------------------------------------------------------- +%% emqx_authz callbacks +%%-------------------------------------------------------------------- + +description() -> + "Fake AuthZ". + +create(Source) -> + Source. + +update(Source) -> + Source. + +destroy(_Source) -> ok. + +authorize(_Client, _PubSub, _Topic, _Source) -> + nomatch. diff --git a/apps/emqx_auth/test/emqx_authz/emqx_authz_test_lib.erl b/apps/emqx_auth/test/emqx_authz/emqx_authz_test_lib.erl index 33035c766..cd8fca4df 100644 --- a/apps/emqx_auth/test/emqx_authz/emqx_authz_test_lib.erl +++ b/apps/emqx_auth/test/emqx_authz/emqx_authz_test_lib.erl @@ -51,6 +51,24 @@ setup_config(BaseConfig, SpecialParams) -> {error, Reason} -> {error, Reason} end. +register_fake_sources(SourceTypes) -> + lists:foreach( + fun(Type) -> + emqx_authz_source_registry:register(Type, emqx_authz_fake_source) + end, + SourceTypes + ). + +deregister_sources() -> + {BuiltInTypes, _} = lists:unzip(?BUILTIN_SOURCES), + SourceTypes = emqx_authz_source_registry:get(), + lists:foreach( + fun(Type) -> + emqx_authz_source_registry:register(Type, emqx_authz_fake_source) + end, + SourceTypes -- BuiltInTypes + ). + %%-------------------------------------------------------------------- %% Table-based test helpers %%-------------------------------------------------------------------- diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src index 26028e8ab..59661d7c0 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_cassandra, [ {description, "EMQX Enterprise Cassandra Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index 0610ee743..afea652ef 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -332,7 +332,7 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) -> end. do_get_status(Conn) -> - ok == element(1, ecql:query(Conn, "SELECT count(1) AS T FROM system.local")). + ok == element(1, ecql:query(Conn, "SELECT cluster_name FROM system.local")). do_check_prepares(#{prepare_cql := Prepares}) when is_map(Prepares) -> ok; diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src index c74e3dde4..e5c559bd5 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src +++ b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_http, [ {description, "EMQX HTTP Bridge and Connector Application"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]}, {env, []}, diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 0191d5e45..5d1b1947c 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -57,6 +57,8 @@ -define(DEFAULT_PIPELINE_SIZE, 100). -define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000). +-define(READACT_REQUEST_NOTE, "the request body is redacted due to security reasons"). + %%===================================================================== %% Hocon schema @@ -303,7 +305,8 @@ on_query( "QUERY", "http_connector_received", #{ - request => redact(Request), + request => redact_request(Request), + note => ?READACT_REQUEST_NOTE, connector => InstId, state => redact(State) } @@ -329,7 +332,7 @@ on_query( {error, #{status_code := StatusCode}} -> ?SLOG(error, #{ msg => "http_connector_do_request_received_error_response", - note => "the body will be redacted due to security reasons", + note => ?READACT_REQUEST_NOTE, request => redact_request(NRequest), connector => InstId, status_code => StatusCode @@ -338,7 +341,8 @@ on_query( {error, Reason} -> ?SLOG(error, #{ msg => "http_connector_do_request_failed", - request => redact(NRequest), + note => ?READACT_REQUEST_NOTE, + request => redact_request(NRequest), reason => Reason, connector => InstId }), @@ -379,7 +383,8 @@ on_query_async( "QUERY_ASYNC", "http_connector_received", #{ - request => redact(Request), + request => redact_request(Request), + note => ?READACT_REQUEST_NOTE, connector => InstId, state => redact(State) } diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 362ead229..749250306 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -32,9 +32,9 @@ -define(kafka_producers, kafka_producers). query_mode(#{kafka := #{query_mode := sync}}) -> - simple_sync; + simple_sync_internal_buffer; query_mode(_) -> - simple_async. + simple_async_internal_buffer. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 4450030e0..b704fc92c 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -133,7 +133,7 @@ t_query_mode(CtConfig) -> end, fun(Trace) -> %% We should have a sync Snabbkaffe trace - ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace)) + ?assertMatch([_], ?of_kind(simple_sync_internal_buffer_query, Trace)) end ), ?check_trace( @@ -141,7 +141,7 @@ t_query_mode(CtConfig) -> publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"}) end, fun(Trace) -> - %% We should have a sync Snabbkaffe trace + %% We should have an async Snabbkaffe trace ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace)) end ), diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index 16c9ce59f..b012874f8 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 2fc44e5ca..33ac83ee1 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -73,7 +73,7 @@ callback_mode() -> async_if_possible. query_mode(_Config) -> - simple_async. + simple_async_internal_buffer. -spec on_start(resource_id(), config()) -> {ok, state()}. on_start(InstanceId, Config) -> diff --git a/apps/emqx_gcp_device/src/emqx_gcp_device.erl b/apps/emqx_gcp_device/src/emqx_gcp_device.erl index ae976ce20..381c237dd 100644 --- a/apps/emqx_gcp_device/src/emqx_gcp_device.erl +++ b/apps/emqx_gcp_device/src/emqx_gcp_device.erl @@ -8,7 +8,10 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). --define(AUTHN_SHARD, ?MODULE). +%% NOTE +%% We share the shard with `emqx_auth_mnesia` to ensure backward compatibility +%% with EMQX 5.2.x and earlier. +-define(AUTHN_SHARD, emqx_authn_shard). %% Management -export([put_device/1, get_device/1, remove_device/1]). diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 6a90a1e0a..0c8a4b31e 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -22,11 +22,18 @@ -type resource_state() :: term(). -type resource_status() :: connected | disconnected | connecting | stopped. -type callback_mode() :: always_sync | async_if_possible. --type query_mode() :: simple_sync | simple_async | sync | async | no_queries. +-type query_mode() :: + simple_sync + | simple_async + | simple_sync_internal_buffer + | simple_async_internal_buffer + | sync + | async + | no_queries. -type result() :: term(). -type reply_fun() :: - {fun((result(), Args :: term()) -> any()), Args :: term()} - | {fun((result(), Args :: term()) -> any()), Args :: term(), reply_context()} + {fun((...) -> any()), Args :: [term()]} + | {fun((...) -> any()), Args :: [term()], reply_context()} | undefined. -type reply_context() :: #{reply_dropped => boolean()}. -type query_opts() :: #{ @@ -36,7 +43,6 @@ expire_at => infinity | integer(), async_reply_fun => reply_fun(), simple_query => boolean(), - is_buffer_supported => boolean(), reply_to => reply_fun() }. -type resource_data() :: #{ diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 92701a8c4..df50625ab 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -311,6 +311,20 @@ query(ResId, Request, Opts) -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts); + {simple_async_internal_buffer, _} -> + %% This is for bridges/connectors that have internal buffering, such + %% as Kafka and Pulsar producers. + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again + emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts); + {simple_sync_internal_buffer, _} -> + %% This is for bridges/connectors that have internal buffering, such + %% as Kafka and Pulsar producers. + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again + emqx_resource_buffer_worker:simple_sync_internal_buffer_query( + ResId, Request, Opts + ); {sync, _} -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); {async, _} -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 9026fe65c..a47884161 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -39,7 +39,8 @@ -export([ simple_sync_query/2, simple_sync_query/3, - simple_async_query/3 + simple_async_query/3, + simple_sync_internal_buffer_query/3 ]). -export([ @@ -53,7 +54,9 @@ -export([queue_item_marshaller/1, estimate_size/1]). --export([handle_async_reply/2, handle_async_batch_reply/2, reply_call/2]). +-export([ + handle_async_reply/2, handle_async_batch_reply/2, reply_call/2, reply_call_internal_buffer/3 +]). -export([clear_disk_queue_dir/2]). @@ -169,6 +172,42 @@ simple_async_query(Id, Request, QueryOpts0) -> _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. +%% This is a hack to handle cases where the underlying connector has internal buffering +%% (e.g.: Kafka and Pulsar producers). Since the message may be inernally retried at a +%% later time, we can't bump metrics immediatelly if the return value is not a success +%% (e.g.: if the call timed out, but the message was enqueued nevertheless). +-spec simple_sync_internal_buffer_query(id(), request(), query_opts()) -> term(). +simple_sync_internal_buffer_query(Id, Request, QueryOpts0) -> + ?tp(simple_sync_internal_buffer_query, #{id => Id, request => Request, query_opts => QueryOpts0}), + ReplyAlias = alias([reply]), + try + MaybeReplyTo = maps:get(reply_to, QueryOpts0, undefined), + QueryOpts1 = QueryOpts0#{ + reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]} + }, + QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1), + case simple_async_query(Id, Request, QueryOpts) of + {error, _} = Error -> + Error; + {async_return, {error, _} = Error} -> + Error; + {async_return, {ok, _Pid}} -> + receive + {ReplyAlias, Response} -> + Response + after Timeout -> + _ = unalias(ReplyAlias), + receive + {ReplyAlias, Response} -> + Response + after 0 -> {error, timeout} + end + end + end + after + _ = unalias(ReplyAlias) + end. + simple_query_opts() -> ensure_expire_at(#{simple_query => true, timeout => infinity}). @@ -1049,7 +1088,7 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) -> end. do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when - ResQM =:= simple_async; ResQM =:= simple_sync + ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer -> %% The connector supports buffer, send even in disconnected state #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource, @@ -1908,6 +1947,12 @@ reply_call(Alias, Response) -> erlang:send(Alias, {Alias, Response}), ok. +%% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to' +%% callbacks. +reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) -> + ?MODULE:reply_call(ReplyAlias, Response), + do_reply_caller(MaybeReplyTo, Response). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). adjust_batch_time_test_() -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 288083172..ec6165f85 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -147,9 +147,9 @@ create(ResId, Group, ResourceType, Config, Opts) -> QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts), case QueryMode of %% the resource has built-in buffer, so there is no need for resource workers - simple_sync -> + simple_sync_internal_buffer -> ok; - simple_async -> + simple_async_internal_buffer -> ok; %% The resource is a consumer resource, so there is no need for resource workers no_queries -> diff --git a/changes/ce/fix-11747.en.md b/changes/ce/fix-11747.en.md new file mode 100644 index 000000000..db1a76a47 --- /dev/null +++ b/changes/ce/fix-11747.en.md @@ -0,0 +1,2 @@ +Update QUIC stack to msquic 2.2.3. + diff --git a/changes/ce/fix-11747.zh.md b/changes/ce/fix-11747.zh.md new file mode 100644 index 000000000..39db44512 --- /dev/null +++ b/changes/ce/fix-11747.zh.md @@ -0,0 +1,2 @@ +更新 QUIC 栈至 msquic 2.2.3 + diff --git a/changes/ee/fix-11724.en.md b/changes/ee/fix-11724.en.md new file mode 100644 index 000000000..633092357 --- /dev/null +++ b/changes/ee/fix-11724.en.md @@ -0,0 +1 @@ +Fixed a metrics issue where messages sent to Kafka would count as failed even when they were successfully sent late due to its internal buffering. diff --git a/changes/ee/fix-11733.en.md b/changes/ee/fix-11733.en.md new file mode 100644 index 000000000..ede662bf9 --- /dev/null +++ b/changes/ee/fix-11733.en.md @@ -0,0 +1 @@ +Resolved an incompatibility issue that led to crashes during session takeover / channel eviction when the session was residing on a remote node running EMQX v5.2.x or earlier. diff --git a/changes/ee/fix-11750.en.md b/changes/ee/fix-11750.en.md new file mode 100644 index 000000000..82dcd1d1e --- /dev/null +++ b/changes/ee/fix-11750.en.md @@ -0,0 +1 @@ +Eliminated logging and tracing of HTTP request bodies in HTTP authentification and HTTP bridges. diff --git a/changes/ee/fix-11760.en.md b/changes/ee/fix-11760.en.md new file mode 100644 index 000000000..9d5861529 --- /dev/null +++ b/changes/ee/fix-11760.en.md @@ -0,0 +1 @@ +Simplified the CQL query employed for Cassandra bridge health check that was apparently the source of warnings in Cassandra server logs. diff --git a/mix.exs b/mix.exs index 308472b2a..3817b5121 100644 --- a/mix.exs +++ b/mix.exs @@ -821,7 +821,7 @@ defmodule EMQXUmbrella.MixProject do defp quicer_dep() do if enable_quicer?(), # in conflict with emqx and emqtt - do: [{:quicer, github: "emqx/quic", tag: "0.0.201", override: true}], + do: [{:quicer, github: "emqx/quic", tag: "0.0.202", override: true}], else: [] end diff --git a/rebar.config.erl b/rebar.config.erl index 39b813e53..dd9bd1b04 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -39,7 +39,7 @@ bcrypt() -> {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.1"}}}. quicer() -> - {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.201"}}}. + {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.202"}}}. jq() -> {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.10"}}}.