From 802361dbd088fc23fc3df1d125df2459aca64380 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 28 May 2024 14:13:21 -0300 Subject: [PATCH 01/24] fix(connector api): handle `timeout` when waiting for connector status Fixes https://emqx.atlassian.net/browse/EMQX-12251 --- apps/emqx_connector/src/emqx_connector.app.src | 2 +- apps/emqx_connector/src/emqx_connector_api.erl | 12 ++++++++++++ .../emqx_connector/test/emqx_connector_api_SUITE.erl | 10 ++++++++-- apps/emqx_resource/src/emqx_resource.app.src | 2 +- apps/emqx_resource/src/emqx_resource_manager.erl | 2 +- changes/ce/fix-13148.en.md | 1 + 6 files changed, 24 insertions(+), 5 deletions(-) create mode 100644 changes/ce/fix-13148.en.md diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index 135d57dbe..ec57f2c85 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.3.0"}, + {vsn, "0.3.1"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 97f68b7ef..4c7a0476e 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -685,6 +685,10 @@ is_ok(OkResult = {ok, _}) -> OkResult; is_ok(Error = {error, _}) -> Error; +is_ok(timeout) -> + %% Returned by `emqx_resource_manager:start' when the connector fails to reach either + %% `?status_connected' or `?status_disconnected' within `start_timeout'. + timeout; is_ok(ResL) -> case lists:filter( @@ -723,6 +727,14 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, ConnectorType, ConnectorName case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok -> ?NO_CONTENT; + timeout -> + %% Returned by `emqx_resource_manager:start' when the connector fails to reach + %% either `?status_connected' or `?status_disconnected' within + %% `start_timeout'. + ?BAD_REQUEST(<< + "Timeout while waiting for connector to reach connected status." + " Please try again." + >>); {error, not_implemented} -> ?NOT_IMPLEMENTED; {error, timeout} -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 2b1ada37b..0f57e9034 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -536,14 +536,20 @@ do_start_connector(TestType, Config) -> request_json( post, uri(["connectors"]), - ?KAFKA_CONNECTOR(BadName, BadServer), + (?KAFKA_CONNECTOR(BadName, BadServer))#{ + <<"resource_opts">> => #{ + <<"start_timeout">> => <<"10ms">> + } + }, Config ) ), BadConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, BadName), + %% Checks that an `emqx_resource_manager:start' timeout when waiting for the resource to + %% be connected doesn't return a 500 error. ?assertMatch( %% request from product: return 400 on such errors - {ok, SC, _} when SC == 500 orelse SC == 400, + {ok, 400, _}, request(post, {operation, TestType, start, BadConnectorID}, Config) ), ok = gen_tcp:close(Sock), diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 913cc5e8c..39b8ec8d1 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.29"}, + {vsn, "0.1.30"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index d650a2afb..a09a6e9f8 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -274,7 +274,7 @@ restart(ResId, Opts) when is_binary(ResId) -> end. %% @doc Start the resource --spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. +-spec start(resource_id(), creation_opts()) -> ok | timeout | {error, Reason :: term()}. start(ResId, Opts) -> StartTimeout = maps:get(start_timeout, Opts, ?T_OPERATION), case safe_call(ResId, start, StartTimeout) of diff --git a/changes/ce/fix-13148.en.md b/changes/ce/fix-13148.en.md new file mode 100644 index 000000000..15002e132 --- /dev/null +++ b/changes/ce/fix-13148.en.md @@ -0,0 +1 @@ +Fixed an issue where a 500 HTTP status code could be returned by `/connectors/:connector-id/start` when there is a timeout waiting for the resource to be connected. From 1129c183305544077bd8562304078ad099451391 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Thu, 30 May 2024 11:36:13 +0200 Subject: [PATCH 02/24] fix(authz_http): fix content-type header in http request --- apps/emqx_authz/src/emqx_authz.app.src | 2 +- apps/emqx_authz/src/emqx_authz_http.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_authz/src/emqx_authz.app.src b/apps/emqx_authz/src/emqx_authz.app.src index 9de573795..5c3a26eb9 100644 --- a/apps/emqx_authz/src/emqx_authz.app.src +++ b/apps/emqx_authz/src/emqx_authz.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_authz, [ {description, "An OTP application"}, - {vsn, "0.1.25"}, + {vsn, "0.1.26"}, {registered, []}, {mod, {emqx_authz_app, []}}, {applications, [ diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl index a5dff322d..ffc4045c5 100644 --- a/apps/emqx_authz/src/emqx_authz_http.erl +++ b/apps/emqx_authz/src/emqx_authz_http.erl @@ -200,7 +200,7 @@ generate_request( _ -> NPath = append_query(Path, Query), NBody = serialize_body( - proplists:get_value(<<"accept">>, Headers, <<"application/json">>), + proplists:get_value(<<"Content-Type">>, Headers, <<"application/json">>), Body ), {NPath, Headers, NBody} From 0c4da98b5209d72405468adee97a29b478783bd7 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Thu, 30 May 2024 11:53:00 +0200 Subject: [PATCH 03/24] chore: update deps --- apps/emqx_authz/src/emqx_authz_http.erl | 2 +- apps/emqx_bridge_dynamo/rebar.config | 2 +- apps/emqx_bridge_hstreamdb/rebar.config | 2 +- apps/emqx_bridge_kinesis/rebar.config | 2 +- apps/emqx_s3/rebar.config | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl index ffc4045c5..a34a7514a 100644 --- a/apps/emqx_authz/src/emqx_authz_http.erl +++ b/apps/emqx_authz/src/emqx_authz_http.erl @@ -200,7 +200,7 @@ generate_request( _ -> NPath = append_query(Path, Query), NBody = serialize_body( - proplists:get_value(<<"Content-Type">>, Headers, <<"application/json">>), + proplists:get_value(<<"content-type">>, Headers, <<"application/json">>), Body ), {NPath, Headers, NBody} diff --git a/apps/emqx_bridge_dynamo/rebar.config b/apps/emqx_bridge_dynamo/rebar.config index e80fb0f80..38598d313 100644 --- a/apps/emqx_bridge_dynamo/rebar.config +++ b/apps/emqx_bridge_dynamo/rebar.config @@ -1,6 +1,6 @@ %% -*- mode: erlang; -*- {erl_opts, [debug_info]}. -{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}} +{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0.3"}}} , {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_hstreamdb/rebar.config b/apps/emqx_bridge_hstreamdb/rebar.config index fb99cd627..92b9c46cd 100644 --- a/apps/emqx_bridge_hstreamdb/rebar.config +++ b/apps/emqx_bridge_hstreamdb/rebar.config @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {erl_opts, [debug_info]}. {deps, [ - {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.4.5+v0.16.1"}}}, + {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.4.5+v0.16.1+ezstd-v1.0.5-emqx1"}}}, {emqx, {path, "../../apps/emqx"}}, {emqx_utils, {path, "../../apps/emqx_utils"}} ]}. diff --git a/apps/emqx_bridge_kinesis/rebar.config b/apps/emqx_bridge_kinesis/rebar.config index e4b57846e..4d7f87540 100644 --- a/apps/emqx_bridge_kinesis/rebar.config +++ b/apps/emqx_bridge_kinesis/rebar.config @@ -1,6 +1,6 @@ %% -*- mode: erlang; -*- {erl_opts, [debug_info]}. -{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}} +{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0.3"}}} , {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_s3/rebar.config b/apps/emqx_s3/rebar.config index 1d64e6677..e34406e54 100644 --- a/apps/emqx_s3/rebar.config +++ b/apps/emqx_s3/rebar.config @@ -1,6 +1,6 @@ {deps, [ {emqx, {path, "../../apps/emqx"}}, - {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}}, + {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0.3"}}}, {emqx_bridge_http, {path, "../emqx_bridge_http"}} ]}. From 97f9c81e19a579e2880cadd1883a4811b8a201b2 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 30 May 2024 14:56:56 +0300 Subject: [PATCH 04/24] feat(auth): add legacy ${access} placeholder --- apps/emqx_authz/src/emqx_authz_http.erl | 16 ++++++++++++++-- apps/emqx_authz/test/emqx_authz_http_SUITE.erl | 13 +++++++++++-- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl index a34a7514a..faa3a3198 100644 --- a/apps/emqx_authz/src/emqx_authz_http.erl +++ b/apps/emqx_authz/src/emqx_authz_http.erl @@ -39,6 +39,10 @@ -compile(nowarn_export_all). -endif. +-define(PH_ACCESS, <<"${access}">>). +-define(LEGACY_SUBSCRIBE_ACTION, 1). +-define(LEGACY_PUBLISH_ACTION, 2). + -define(PLACEHOLDERS, [ ?PH_USERNAME, ?PH_CLIENTID, @@ -48,7 +52,8 @@ ?PH_TOPIC, ?PH_ACTION, ?PH_CERT_SUBJECT, - ?PH_CERT_CN_NAME + ?PH_CERT_CN_NAME, + ?PH_ACCESS ]). -define(PLACEHOLDERS_FOR_RICH_ACTIONS, [ @@ -234,7 +239,14 @@ serialize_body(<<"application/x-www-form-urlencoded">>, Body) -> client_vars(Client, Action, Topic) -> Vars = emqx_authz_utils:vars_for_rule_query(Client, Action), - Vars#{topic => Topic}. + add_legacy_access_var(Vars#{topic => Topic}). + +add_legacy_access_var(#{action := subscribe} = Vars) -> + Vars#{access => ?LEGACY_SUBSCRIBE_ACTION}; +add_legacy_access_var(#{action := publish} = Vars) -> + Vars#{access => ?LEGACY_PUBLISH_ACTION}; +add_legacy_access_var(Vars) -> + Vars. to_list(A) when is_atom(A) -> atom_to_list(A); diff --git a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl index 6cf4b5bc0..7810b5902 100644 --- a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl @@ -202,6 +202,7 @@ t_query_params(_Config) -> mountpoint := <<"MOUNTPOINT">>, topic := <<"t/1">>, action := <<"publish">>, + access := <<"2">>, qos := <<"1">>, retain := <<"false">> } = cowboy_req:match_qs( @@ -213,6 +214,7 @@ t_query_params(_Config) -> mountpoint, topic, action, + access, qos, retain ], @@ -230,6 +232,7 @@ t_query_params(_Config) -> "mountpoint=${mountpoint}&" "topic=${topic}&" "action=${action}&" + "access=${access}&" "qos=${qos}&" "retain=${retain}" >> @@ -264,6 +267,7 @@ t_path(_Config) -> "MOUNTPOINT/" "t%2F1/" "publish/" + "2/" "1/" "false" >>, @@ -281,6 +285,7 @@ t_path(_Config) -> "${mountpoint}/" "${topic}/" "${action}/" + "${access}/" "${qos}/" "${retain}" >> @@ -321,6 +326,7 @@ t_json_body(_Config) -> <<"mountpoint">> := <<"MOUNTPOINT">>, <<"topic">> := <<"t">>, <<"action">> := <<"publish">>, + <<"access">> := <<"2">>, <<"qos">> := <<"1">>, <<"retain">> := <<"false">> }, @@ -338,6 +344,7 @@ t_json_body(_Config) -> <<"mountpoint">> => <<"${mountpoint}">>, <<"topic">> => <<"${topic}">>, <<"action">> => <<"${action}">>, + <<"access">> => <<"${access}">>, <<"qos">> => <<"${qos}">>, <<"retain">> => <<"${retain}">> } @@ -405,7 +412,7 @@ t_placeholder_and_body(_Config) -> cowboy_req:path(Req0) ), - {ok, [{PostVars, true}], Req1} = cowboy_req:read_urlencoded_body(Req0), + {ok, PostVars, Req1} = cowboy_req:read_urlencoded_body(Req0), ?assertMatch( #{ @@ -416,10 +423,11 @@ t_placeholder_and_body(_Config) -> <<"mountpoint">> := <<"MOUNTPOINT">>, <<"topic">> := <<"t">>, <<"action">> := <<"publish">>, + <<"access">> := <<"2">>, <<"CN">> := ?PH_CERT_CN_NAME, <<"CS">> := ?PH_CERT_SUBJECT }, - emqx_utils_json:decode(PostVars, [return_maps]) + maps:from_list(PostVars) ), {ok, ?AUTHZ_HTTP_RESP(allow, Req1), State} end, @@ -433,6 +441,7 @@ t_placeholder_and_body(_Config) -> <<"mountpoint">> => <<"${mountpoint}">>, <<"topic">> => <<"${topic}">>, <<"action">> => <<"${action}">>, + <<"access">> => <<"${access}">>, <<"CN">> => ?PH_CERT_CN_NAME, <<"CS">> => ?PH_CERT_SUBJECT }, From f681126f4dbff5a18ba1194fba96dff7c0307308 Mon Sep 17 00:00:00 2001 From: zmstone Date: Thu, 30 May 2024 22:38:56 +0200 Subject: [PATCH 05/24] docs: add changelog for PR 13164 --- changes/ce/fix-13164.en.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 changes/ce/fix-13164.en.md diff --git a/changes/ce/fix-13164.en.md b/changes/ce/fix-13164.en.md new file mode 100644 index 000000000..c0ce937da --- /dev/null +++ b/changes/ce/fix-13164.en.md @@ -0,0 +1,6 @@ +Fix HTTP authorization request body encoding. + +Prior to this fix, the HTTP authorization request body encoding format was taken from the `accept` header. +The fix is to respect the `content-type` header instead. +Also added `access` templating variable for v4 compatibility. +The access code of SUBSCRIBE action is `1` and SUBSCRIBE action is `2`. From 45e2c81ebcd3ed8edc3d513c507c8ac1ffd39357 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 31 May 2024 17:51:47 +0800 Subject: [PATCH 06/24] fix: missing rule event i18n in zh_cn --- apps/emqx_rule_engine/src/emqx_rule_engine.app.src | 2 +- apps/emqx_rule_engine/src/emqx_rule_events.erl | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 1fed922dd..0535586b1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.1.0"}, + {vsn, "5.1.1"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [ diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 3fb2ba803..45085caf8 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -690,8 +690,8 @@ event_info() -> event_info_schema_validation_failed() -> event_info_common( 'schema.validation_failed', - {<<"schema validation failed">>, <<"TODO"/utf8>>}, - {<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>}, + {<<"schema validation failed">>, <<"schema 验证失败"/utf8>>}, + {<<"messages that do not pass configured validations">>, <<"未通过验证的消息"/utf8>>}, <<"SELECT * FROM \"$events/schema_validation_failed\" WHERE topic =~ 't/#'">> ). ee_event_info() -> From 6fe8a09e97d8bfc0b6aff640072f188ca167e0de Mon Sep 17 00:00:00 2001 From: zmstone Date: Fri, 31 May 2024 14:32:14 +0200 Subject: [PATCH 07/24] fix(authz/http): rename PH_ACCESS to VAR_ACCESS --- apps/emqx_auth_http/src/emqx_authz_http.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_auth_http/src/emqx_authz_http.erl b/apps/emqx_auth_http/src/emqx_authz_http.erl index c5e839472..49296f690 100644 --- a/apps/emqx_auth_http/src/emqx_authz_http.erl +++ b/apps/emqx_auth_http/src/emqx_authz_http.erl @@ -38,7 +38,7 @@ -compile(nowarn_export_all). -endif. --define(PH_ACCESS, <<"${access}">>). +-define(VAR_ACCESS, "access"). -define(LEGACY_SUBSCRIBE_ACTION, 1). -define(LEGACY_PUBLISH_ACTION, 2). @@ -52,7 +52,7 @@ ?VAR_ACTION, ?VAR_CERT_SUBJECT, ?VAR_CERT_CN_NAME, - ?PH_ACCESS, + ?VAR_ACCESS, ?VAR_NS_CLIENT_ATTRS ]). From 9637b1454329db630b97e5fba60c9d895de3d521 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 22 May 2024 18:09:38 +0200 Subject: [PATCH 08/24] fix: upgrade to brod 3.18.0 to fix unnecessary rebalance --- apps/emqx_bridge_azure_event_hub/rebar.config | 2 +- apps/emqx_bridge_confluent/rebar.config | 2 +- apps/emqx_bridge_kafka/rebar.config | 2 +- changes/ee/fix-13093.en.md | 3 +++ mix.exs | 2 +- 5 files changed, 7 insertions(+), 4 deletions(-) create mode 100644 changes/ee/fix-13093.en.md diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 8cd5ee427..5236d9a0e 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -5,7 +5,7 @@ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, - {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, + {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, {snappyer, "1.2.9"}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index a969ac83b..a3997ada7 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -5,7 +5,7 @@ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, - {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, + {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, {snappyer, "1.2.9"}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index fd7f03da8..78569b321 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -5,7 +5,7 @@ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, - {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, + {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, {snappyer, "1.2.9"}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, diff --git a/changes/ee/fix-13093.en.md b/changes/ee/fix-13093.en.md new file mode 100644 index 000000000..ebae132f1 --- /dev/null +++ b/changes/ee/fix-13093.en.md @@ -0,0 +1,3 @@ +Improve Kafka consumer group stability. + +Prior to this change, Kafka consumer group sometimes may need to rebalance twice after Kafka group coordinator restart. diff --git a/mix.exs b/mix.exs index e39fa6dd4..e742a065a 100644 --- a/mix.exs +++ b/mix.exs @@ -213,7 +213,7 @@ defmodule EMQXUmbrella.MixProject do {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, - {:brod, github: "kafka4beam/brod", tag: "3.16.8"}, + {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, {:snappyer, "1.2.9", override: true}, {:crc32cer, "0.1.8", override: true}, {:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true}, From 8d7417d51c48ae2ba8348f45d37853b3964f2fe4 Mon Sep 17 00:00:00 2001 From: zmstone Date: Sun, 2 Jun 2024 10:10:17 +0200 Subject: [PATCH 09/24] chore: stop pinning supervisor in mix.exs --- mix.exs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mix.exs b/mix.exs index e742a065a..5432b64ae 100644 --- a/mix.exs +++ b/mix.exs @@ -102,8 +102,7 @@ defmodule EMQXUmbrella.MixProject do {:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true}, {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true}, {:ra, "2.7.3", override: true}, - {:mimerl, "1.2.0", override: true}, - {:supervisor3, "1.1.12", override: true} + {:mimerl, "1.2.0", override: true} ] ++ emqx_apps(profile_info, version) ++ enterprise_deps(profile_info) ++ jq_dep() ++ quicer_dep() From 21624bc865203080fc6d55b86e00a3518b9671fa Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 28 May 2024 16:30:16 +0800 Subject: [PATCH 10/24] fix: get plugin app vsn from field `rel_apps` --- apps/emqx_plugins/src/emqx_plugins.app.src | 2 +- apps/emqx_plugins/src/emqx_plugins.erl | 27 +++++++++++++++++----- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/apps/emqx_plugins/src/emqx_plugins.app.src b/apps/emqx_plugins/src/emqx_plugins.app.src index 6501d5654..b53aef959 100644 --- a/apps/emqx_plugins/src/emqx_plugins.app.src +++ b/apps/emqx_plugins/src/emqx_plugins.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_plugins, [ {description, "EMQX Plugin Management"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {modules, []}, {mod, {emqx_plugins_app, []}}, {applications, [kernel, stdlib, emqx, erlavro]}, diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 961855e17..baad35af5 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -136,6 +136,19 @@ parse_name_vsn(NameVsn) when is_list(NameVsn) -> make_name_vsn_string(Name, Vsn) -> binary_to_list(iolist_to_binary([Name, "-", Vsn])). +app_dir(AppName, Apps) -> + case + lists:filter( + fun(AppNameVsn) -> nomatch =/= string:prefix(AppNameVsn, AppName) end, + Apps + ) + of + [AppNameVsn] -> + {ok, AppNameVsn}; + _ -> + {error, not_found} + end. + %%-------------------------------------------------------------------- %% Package operations @@ -1372,12 +1385,14 @@ plugin_dir(NameVsn) -> -spec plugin_priv_dir(name_vsn()) -> string(). plugin_priv_dir(NameVsn) -> - case read_plugin_info(NameVsn, #{fill_readme => false}) of - {ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} -> - AppDir = make_name_vsn_string(Name, Vsn), - wrap_to_list(filename:join([plugin_dir(NameVsn), AppDir, "priv"])); - _ -> - wrap_to_list(filename:join([install_dir(), NameVsn, "priv"])) + maybe + {ok, #{<<"name">> := Name, <<"rel_apps">> := Apps}} ?= + read_plugin_info(NameVsn, #{fill_readme => false}), + {ok, AppDir} ?= app_dir(Name, Apps), + wrap_to_list(filename:join([plugin_dir(NameVsn), AppDir, "priv"])) + else + %% Otherwise assume the priv directory is under the plugin root directory + _ -> wrap_to_list(filename:join([install_dir(), NameVsn, "priv"])) end. -spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}. From 95c23fb3add3d510deff3a53da16621d55360744 Mon Sep 17 00:00:00 2001 From: Kinplemelon Date: Mon, 3 Jun 2024 17:31:23 +0800 Subject: [PATCH 11/24] chore(dashboard): bump dashboard version to v1.9.1-beta.1 & e1.7.1-beta.1 --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index cbff37ecd..48377475e 100644 --- a/Makefile +++ b/Makefile @@ -20,8 +20,8 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 -export EMQX_DASHBOARD_VERSION ?= v1.9.0 -export EMQX_EE_DASHBOARD_VERSION ?= e1.7.0 +export EMQX_DASHBOARD_VERSION ?= v1.9.1-beta.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.7.1-beta.1 -include default-profile.mk PROFILE ?= emqx From e3ed7b59dd707872c39a9453db0b8f823bf613b8 Mon Sep 17 00:00:00 2001 From: zmstone Date: Sun, 26 May 2024 01:03:20 +0200 Subject: [PATCH 12/24] feat(redis): add a rule function to help formatting redis args The new function named 'map_to_redis_hset_args' can be used to format a map's key-value pairs into redis HSET (or HMSET) arg list. This new function is dedicated for redis to avoid abuse for other data integrations. --- .../src/emqx_bridge_redis_connector.erl | 37 ++++++++++++++----- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 33 +++++++++++++++++ .../test/emqx_rule_funcs_SUITE.erl | 21 +++++++++++ apps/emqx_utils/src/emqx_placeholder.erl | 3 +- apps/emqx_utils/src/emqx_utils.app.src | 2 +- 5 files changed, 85 insertions(+), 11 deletions(-) diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index e12155bb1..f117c4e7a 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -128,8 +128,8 @@ on_query( #{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result} ), Result; - Error -> - Error + {error, Reason} -> + {error, Reason} end. on_batch_query( @@ -165,8 +165,8 @@ on_batch_query( } ), Result; - Error -> - Error + {error, Reason} -> + {error, Reason} end. trace_format_commands(Commands0) -> @@ -204,11 +204,15 @@ query(InstId, Query, RedisConnSt) -> end. proc_command_template(CommandTemplate, Msg) -> - lists:map( - fun(ArgTks) -> - emqx_placeholder:proc_tmpl(ArgTks, Msg, #{return => full_binary}) - end, - CommandTemplate + lists:reverse( + lists:foldl( + fun(ArgTks, Acc) -> + New = proc_tmpl(ArgTks, Msg), + lists:reverse(New, Acc) + end, + [], + CommandTemplate + ) ). preproc_command_template(CommandTemplate) -> @@ -216,3 +220,18 @@ preproc_command_template(CommandTemplate) -> fun emqx_placeholder:preproc_tmpl/1, CommandTemplate ). + +%% This function mimics emqx_placeholder:proc_tmpl/3 but with an +%% injected special handling of map_to_redis_hset_args result +%% which is a list of redis command args (all in binary string format) +proc_tmpl([{var, Phld}], Data) -> + case emqx_placeholder:lookup_var(Phld, Data) of + [map_to_redis_hset_args | L] -> + L; + Other -> + [emqx_utils_conv:bin(Other)] + end; +proc_tmpl(Tokens, Data) -> + %% more than just a var ref, but a string, or a concatenation of string and a var + %% this is must be a single arg, format it into a binary + [emqx_placeholder:proc_tmpl(Tokens, Data, #{return => full_binary})]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 604f43d82..ed838e6d1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -160,6 +160,7 @@ find/3, join_to_string/1, join_to_string/2, + map_to_redis_hset_args/1, join_to_sql_values_string/1, jq/2, jq/3, @@ -814,6 +815,38 @@ join_to_string(Str) -> emqx_variform_bif:join_to_string(Str). join_to_string(Sep, List) -> emqx_variform_bif:join_to_string(Sep, List). +%% @doc Format map key-value pairs as redis HSET (or HMSET) command fields. +%% Notes: +%% - Non-string keys in the input map are dropped +%% - Keys are not quoted +%% - String values are always quoted +%% - No escape sequence for keys and values +%% - Float point values are formatted with fixed (6) decimal point compact-formatting +map_to_redis_hset_args(Map) when erlang:is_map(Map) -> + [map_to_redis_hset_args | maps:fold(fun redis_hset_acc/3, [], Map)]. + +redis_hset_acc(K, V, IoData) -> + try + [redis_field_name(K), redis_field_value(V) | IoData] + catch + _:_ -> + IoData + end. + +redis_field_name(K) when erlang:is_binary(K) -> + K; +redis_field_name(K) -> + throw({bad_redis_field_name, K}). + +redis_field_value(V) when erlang:is_binary(V) -> + iolist_to_binary([$", V, $"]); +redis_field_value(V) when erlang:is_integer(V) -> + integer_to_binary(V); +redis_field_value(V) when erlang:is_float(V) -> + float2str(V, 6); +redis_field_value(V) when erlang:is_boolean(V) -> + atom_to_binary(V). + join_to_sql_values_string(List) -> QuotedList = [ diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index e260b04e1..fab105f7b 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -1376,6 +1376,27 @@ t_parse_date_errors(_) -> ok. +t_map_to_redis_hset_args(_Config) -> + Do = fun(Map) -> tl(emqx_rule_funcs:map_to_redis_hset_args(Map)) end, + ?assertEqual([], Do(#{})), + ?assertEqual([], Do(#{1 => 2})), + ?assertEqual([<<"a">>, <<"1">>], Do(#{<<"a">> => 1, 3 => 4})), + ?assertEqual([<<"a">>, <<"1.1">>], Do(#{<<"a">> => 1.1})), + ?assertEqual([<<"a">>, <<"true">>], Do(#{<<"a">> => true})), + ?assertEqual([<<"a">>, <<"false">>], Do(#{<<"a">> => false})), + ?assertEqual([<<"a">>, <<"\"\"">>], Do(#{<<"a">> => <<"">>})), + ?assertEqual([<<"a">>, <<"\"i j\"">>], Do(#{<<"a">> => <<"i j">>})), + %% no determined ordering + ?assert( + case Do(#{<<"a">> => 1, <<"b">> => 2}) of + [<<"a">>, <<"1">>, <<"b">>, <<"2">>] -> + true; + [<<"b">>, <<"2">>, <<"a">>, <<"1">>] -> + true + end + ), + ok. + %%------------------------------------------------------------------------------ %% Utility functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_utils/src/emqx_placeholder.erl b/apps/emqx_utils/src/emqx_placeholder.erl index 84000669d..ddc32cd0d 100644 --- a/apps/emqx_utils/src/emqx_placeholder.erl +++ b/apps/emqx_utils/src/emqx_placeholder.erl @@ -37,7 +37,8 @@ proc_tmpl_deep/3, bin/1, - sql_data/1 + sql_data/1, + lookup_var/2 ]). -export([ diff --git a/apps/emqx_utils/src/emqx_utils.app.src b/apps/emqx_utils/src/emqx_utils.app.src index 8aef21479..bac23cefb 100644 --- a/apps/emqx_utils/src/emqx_utils.app.src +++ b/apps/emqx_utils/src/emqx_utils.app.src @@ -2,7 +2,7 @@ {application, emqx_utils, [ {description, "Miscellaneous utilities for EMQX apps"}, % strict semver, bump manually! - {vsn, "5.2.0"}, + {vsn, "5.2.1"}, {modules, [ emqx_utils, emqx_utils_api, From aa7ce1f64121385ebe0f21d2db5969acb2da3d17 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 3 Jun 2024 23:03:15 +0200 Subject: [PATCH 13/24] fix(bridge/redis): add test case for map_to_redis_hset_args --- .../test/emqx_bridge_v2_redis_SUITE.erl | 48 +++++++++++++++---- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 2 +- .../test/emqx_rule_funcs_SUITE.erl | 4 +- 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl b/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl index f0fb8872d..7d3003bfa 100644 --- a/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl +++ b/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl @@ -46,7 +46,8 @@ matrix_testcases() -> t_start_stop, t_create_via_http, t_on_get_status, - t_sync_query + t_sync_query, + t_map_to_redis_hset_args ]. init_per_suite(Config) -> @@ -133,7 +134,7 @@ common_init_per_testcase(TestCase, Config) -> Path = group_path(Config), ct:comment(Path), ConnectorConfig = connector_config(Name, Path, NConfig), - BridgeConfig = action_config(Name, Path, Name), + BridgeConfig = action_config(Name, Path, Name, TestCase), ok = snabbkaffe:start_trace(), [ {connector_type, ?CONNECTOR_TYPE}, @@ -222,7 +223,14 @@ parse_and_check_connector_config(InnerConfigMap, Name) -> ct:pal("parsed config: ~p", [Config]), InnerConfigMap. -action_config(Name, Path, ConnectorId) -> +action_config(Name, Path, ConnectorId, TestCase) -> + Template = + try + ?MODULE:TestCase(command_template) + catch + _:_ -> + [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>] + end, [RedisType, _Transport | _] = Path, CommonCfg = #{ @@ -230,7 +238,7 @@ action_config(Name, Path, ConnectorId) -> <<"connector">> => ConnectorId, <<"parameters">> => #{ - <<"command_template">> => [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>], + <<"command_template">> => Template, <<"redis_type">> => atom_to_binary(RedisType) }, <<"local_topic">> => <<"t/redis">>, @@ -262,8 +270,11 @@ parse_and_check_bridge_config(InnerConfigMap, Name) -> emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap). make_message() -> - ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + make_message_with_payload(Payload). + +make_message_with_payload(Payload) -> + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), #{ clientid => ClientId, payload => Payload, @@ -290,7 +301,7 @@ t_start_stop(matrix) -> [sentinel, tcp], [cluster, tcp] ]}; -t_start_stop(Config) -> +t_start_stop(Config) when is_list(Config) -> emqx_bridge_v2_testlib:t_start_stop(Config, redis_bridge_stopped), ok. @@ -300,7 +311,7 @@ t_create_via_http(matrix) -> [sentinel, tcp], [cluster, tcp] ]}; -t_create_via_http(Config) -> +t_create_via_http(Config) when is_list(Config) -> emqx_bridge_v2_testlib:t_create_via_http(Config), ok. @@ -310,7 +321,7 @@ t_on_get_status(matrix) -> [sentinel, tcp], [cluster, tcp] ]}; -t_on_get_status(Config) -> +t_on_get_status(Config) when is_list(Config) -> emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), ok. @@ -320,7 +331,7 @@ t_sync_query(matrix) -> [sentinel, tcp], [cluster, tcp] ]}; -t_sync_query(Config) -> +t_sync_query(Config) when is_list(Config) -> ok = emqx_bridge_v2_testlib:t_sync_query( Config, fun make_message/0, @@ -328,3 +339,22 @@ t_sync_query(Config) -> redis_bridge_connector_send_done ), ok. + +t_map_to_redis_hset_args(matrix) -> + {map_to_redis_hset_args, [ + [single, tcp], + [sentinel, tcp], + [cluster, tcp] + ]}; +t_map_to_redis_hset_args(command_template) -> + [<<"HMSET">>, <<"t_map_to_redis_hset_args">>, <<"${payload}">>]; +t_map_to_redis_hset_args(Config) when is_list(Config) -> + Payload = emqx_rule_funcs:map_to_redis_hset_args(#{<<"a">> => 1, <<"b">> => <<"2">>}), + MsgFn = fun() -> make_message_with_payload(Payload) end, + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, + MsgFn, + fun(Res) -> ?assertMatch({ok, _}, Res) end, + redis_bridge_connector_send_done + ), + ok. diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index ed838e6d1..9de7b0173 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -839,7 +839,7 @@ redis_field_name(K) -> throw({bad_redis_field_name, K}). redis_field_value(V) when erlang:is_binary(V) -> - iolist_to_binary([$", V, $"]); + V; redis_field_value(V) when erlang:is_integer(V) -> integer_to_binary(V); redis_field_value(V) when erlang:is_float(V) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index fab105f7b..eb7b97b5f 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -1384,8 +1384,8 @@ t_map_to_redis_hset_args(_Config) -> ?assertEqual([<<"a">>, <<"1.1">>], Do(#{<<"a">> => 1.1})), ?assertEqual([<<"a">>, <<"true">>], Do(#{<<"a">> => true})), ?assertEqual([<<"a">>, <<"false">>], Do(#{<<"a">> => false})), - ?assertEqual([<<"a">>, <<"\"\"">>], Do(#{<<"a">> => <<"">>})), - ?assertEqual([<<"a">>, <<"\"i j\"">>], Do(#{<<"a">> => <<"i j">>})), + ?assertEqual([<<"a">>, <<"">>], Do(#{<<"a">> => <<"">>})), + ?assertEqual([<<"a">>, <<"i j">>], Do(#{<<"a">> => <<"i j">>})), %% no determined ordering ?assert( case Do(#{<<"a">> => 1, <<"b">> => 2}) of From e7fecd5e91bcb8a23fd391f443247d91f5297079 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 3 Jun 2024 23:13:13 +0200 Subject: [PATCH 14/24] docs: add changelog for PR 13172 --- changes/ee/feat-13172.en.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changes/ee/feat-13172.en.md diff --git a/changes/ee/feat-13172.en.md b/changes/ee/feat-13172.en.md new file mode 100644 index 000000000..259dc075a --- /dev/null +++ b/changes/ee/feat-13172.en.md @@ -0,0 +1,5 @@ +Added a rule function `map_to_redis_hset_args` to help preparing redis HSET (or HMSET) multi-fields values. + +For example, if `payload.value` is a map of multiple data fields, +this rule `SELECT map_to_redis_hset_args(payload.value) as hset_fields FROM "t/#"` can prepare `hset_fields` +for redis action to render the command template like `HMSET name1 ${hset_fields}`. From e4e53844d54bd885366bfd25e039243b518221d4 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 7 May 2024 16:24:31 +0800 Subject: [PATCH 15/24] feat(events): add new hook && event `client.check_authn_complete` --- apps/emqx/src/emqx_access_control.erl | 21 ++++--- apps/emqx/src/emqx_hookpoints.erl | 1 + .../src/emqx_rule_api_schema.erl | 12 ++++ .../emqx_rule_engine/src/emqx_rule_events.erl | 63 +++++++++++++++++++ .../test/emqx_rule_engine_SUITE.erl | 15 ++++- .../test/emqx_rule_engine_api_2_SUITE.erl | 15 +++++ .../emqx_rule_engine_api_rule_test_SUITE.erl | 22 +++++++ rel/i18n/emqx_rule_api_schema.hocon | 6 ++ 8 files changed, 147 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_access_control.erl b/apps/emqx/src/emqx_access_control.erl index 016386011..9d2cbfe90 100644 --- a/apps/emqx/src/emqx_access_control.erl +++ b/apps/emqx/src/emqx_access_control.erl @@ -56,21 +56,21 @@ authenticate(Credential) -> NotSuperUser = #{is_superuser => false}, case pre_hook_authenticate(Credential) of ok -> - inc_authn_metrics(anonymous), + on_authentication_complete(Credential, NotSuperUser, anonymous), {ok, NotSuperUser}; continue -> case run_hooks('client.authenticate', [Credential], ignore) of ignore -> - inc_authn_metrics(anonymous), + on_authentication_complete(Credential, NotSuperUser, anonymous), {ok, NotSuperUser}; ok -> - inc_authn_metrics(ok), + on_authentication_complete(Credential, NotSuperUser, ok), {ok, NotSuperUser}; - {ok, _AuthResult} = OkResult -> - inc_authn_metrics(ok), + {ok, AuthResult} = OkResult -> + on_authentication_complete(Credential, AuthResult, ok), OkResult; - {ok, _AuthResult, _AuthData} = OkResult -> - inc_authn_metrics(ok), + {ok, AuthResult, _AuthData} = OkResult -> + on_authentication_complete(Credential, AuthResult, ok), OkResult; {error, _Reason} = Error -> inc_authn_metrics(error), @@ -240,3 +240,10 @@ inc_authn_metrics(ok) -> inc_authn_metrics(anonymous) -> emqx_metrics:inc('authentication.success.anonymous'), emqx_metrics:inc('authentication.success'). + +on_authentication_complete(Credential, Result, Type) -> + emqx_hooks:run( + 'client.check_authn_complete', + [Credential, Result#{is_anonymous => (Type =:= anonymous)}] + ), + inc_authn_metrics(Type). diff --git a/apps/emqx/src/emqx_hookpoints.erl b/apps/emqx/src/emqx_hookpoints.erl index 0fcf76f3f..e33896719 100644 --- a/apps/emqx/src/emqx_hookpoints.erl +++ b/apps/emqx/src/emqx_hookpoints.erl @@ -44,6 +44,7 @@ 'client.disconnected', 'client.authorize', 'client.check_authz_complete', + 'client.check_authn_complete', 'client.authenticate', 'client.subscribe', 'client.unsubscribe', diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index a9f65b0fa..117340549 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -282,6 +282,17 @@ fields("ctx_check_authz_complete") -> {"authz_source", sc(binary(), #{desc => ?DESC("event_authz_source")})}, {"result", sc(binary(), #{desc => ?DESC("event_result")})} ]; +fields("ctx_check_authn_complete") -> + Event = 'client.check_authn_complete', + [ + {"event_type", event_type_sc(Event)}, + {"event", event_sc(Event)}, + {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, + {"username", sc(binary(), #{desc => ?DESC("event_username")})}, + {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})}, + {"is_anonymous", sc(boolean(), #{desc => ?DESC("event_is_anonymous")})}, + {"is_superuser", sc(boolean(), #{desc => ?DESC("event_is_superuser")})} + ]; fields("ctx_bridge_mqtt") -> Event = '$bridges/mqtt:*', EventBin = atom_to_binary(Event), @@ -330,6 +341,7 @@ rule_input_message_context() -> ref("ctx_disconnected"), ref("ctx_connack"), ref("ctx_check_authz_complete"), + ref("ctx_check_authn_complete"), ref("ctx_bridge_mqtt"), ref("ctx_delivery_dropped"), ref("ctx_schema_validation_failed") diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 45085caf8..937132c10 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -40,6 +40,7 @@ on_client_disconnected/4, on_client_connack/4, on_client_check_authz_complete/6, + on_client_check_authn_complete/3, on_session_subscribed/4, on_session_unsubscribed/4, on_message_publish/2, @@ -182,6 +183,18 @@ on_client_check_authz_complete( Conf ). +on_client_check_authn_complete(ClientInfo, Result, Conf) -> + apply_event( + 'client.check_authn_complete', + fun() -> + eventmsg_check_authn_complete( + ClientInfo, + Result + ) + end, + Conf + ). + on_client_disconnected(ClientInfo, Reason, ConnInfo, Conf) -> apply_event( 'client.disconnected', @@ -438,6 +451,27 @@ eventmsg_check_authz_complete( #{} ). +eventmsg_check_authn_complete( + _ClientInfo = #{ + clientid := ClientId, + username := Username, + peerhost := PeerHost + }, + #{is_anonymous := IsAnonymous} = Result +) -> + IsSuperuser = maps:get(is_superuser, Result, false), + with_basic_columns( + 'client.check_authn_complete', + #{ + clientid => ClientId, + username => Username, + peerhost => ntoa(PeerHost), + is_anonymous => IsAnonymous, + is_superuser => IsSuperuser + }, + #{} + ). + eventmsg_sub_or_unsub( Event, _ClientInfo = #{ @@ -679,6 +713,7 @@ event_info() -> event_info_client_disconnected(), event_info_client_connack(), event_info_client_check_authz_complete(), + event_info_client_check_authn_complete(), event_info_session_subscribed(), event_info_session_unsubscribed(), event_info_delivery_dropped(), @@ -770,6 +805,13 @@ event_info_client_check_authz_complete() -> {<<"client check authz complete">>, <<"授权结果"/utf8>>}, <<"SELECT * FROM \"$events/client_check_authz_complete\"">> ). +event_info_client_check_authn_complete() -> + event_info_common( + 'client.check_authn_complete', + {<<"client check authn complete">>, <<"认证结果"/utf8>>}, + {<<"client check authn complete">>, <<"认证结果"/utf8>>}, + <<"SELECT * FROM \"$events/client_check_authn_complete\"">> + ). event_info_session_subscribed() -> event_info_common( 'session.subscribed', @@ -854,6 +896,13 @@ test_columns('client.check_authz_complete') -> {<<"action">>, [<<"publish">>, <<"the action of publish or subscribe">>]}, {<<"result">>, [<<"allow">>, <<"the authz check complete result">>]} ]; +test_columns('client.check_authn_complete') -> + [ + {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}, + {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}, + {<<"is_superuser">>, [true, <<"Whether this is a superuser">>]}, + {<<"is_anonymous">>, [false, <<"Whether this is a superuser">>]} + ]; test_columns('session.unsubscribed') -> test_columns('session.subscribed'); test_columns('session.subscribed') -> @@ -1023,6 +1072,17 @@ columns_with_exam('client.check_authz_complete') -> {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; +columns_with_exam('client.check_authn_complete') -> + [ + {<<"event">>, 'client.check_authz_complete'}, + {<<"clientid">>, <<"c_emqx">>}, + {<<"username">>, <<"u_emqx">>}, + {<<"peerhost">>, <<"192.168.0.10">>}, + {<<"is_superuser">>, true}, + {<<"is_anonymous">>, false}, + {<<"timestamp">>, erlang:system_time(millisecond)}, + {<<"node">>, node()} + ]; columns_with_exam('session.subscribed') -> [columns_example_props(sub_props)] ++ columns_message_sub_unsub('session.subscribed'); columns_with_exam('session.unsubscribed') -> @@ -1124,6 +1184,7 @@ hook_fun('client.connected') -> fun ?MODULE:on_client_connected/3; hook_fun('client.disconnected') -> fun ?MODULE:on_client_disconnected/4; hook_fun('client.connack') -> fun ?MODULE:on_client_connack/4; hook_fun('client.check_authz_complete') -> fun ?MODULE:on_client_check_authz_complete/6; +hook_fun('client.check_authn_complete') -> fun ?MODULE:on_client_check_authn_complete/3; hook_fun('session.subscribed') -> fun ?MODULE:on_session_subscribed/4; hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4; hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3; @@ -1149,6 +1210,7 @@ event_name(<<"$events/client_connected">>) -> 'client.connected'; event_name(<<"$events/client_disconnected">>) -> 'client.disconnected'; event_name(<<"$events/client_connack">>) -> 'client.connack'; event_name(<<"$events/client_check_authz_complete">>) -> 'client.check_authz_complete'; +event_name(<<"$events/client_check_authn_complete">>) -> 'client.check_authn_complete'; event_name(<<"$events/session_subscribed">>) -> 'session.subscribed'; event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed'; event_name(<<"$events/message_delivered">>) -> 'message.delivered'; @@ -1163,6 +1225,7 @@ event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.disconnected') -> <<"$events/client_disconnected">>; event_topic('client.connack') -> <<"$events/client_connack">>; event_topic('client.check_authz_complete') -> <<"$events/client_check_authz_complete">>; +event_topic('client.check_authn_complete') -> <<"$events/client_check_authn_complete">>; event_topic('session.subscribed') -> <<"$events/session_subscribed">>; event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index a84ead1c2..395883e7b 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -237,6 +237,7 @@ init_per_testcase(t_events, Config) -> "\"$events/client_disconnected\", " "\"$events/client_connack\", " "\"$events/client_check_authz_complete\", " + "\"$events/client_check_authn_complete\", " "\"$events/session_subscribed\", " "\"$events/session_unsubscribed\", " "\"$events/message_acked\", " @@ -1084,6 +1085,7 @@ client_connected(Client, Client2) -> {ok, _} = emqtt:connect(Client2), verify_event('client.connack'), verify_event('client.connected'), + verify_event('client.check_authn_complete'), ok. client_disconnected(Client, Client2) -> ok = emqtt:disconnect(Client, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}), @@ -4196,7 +4198,18 @@ verify_event_fields('client.check_authz_complete', Fields) -> ]) ), ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), - ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])). + ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])); +verify_event_fields('client.check_authn_complete', Fields) -> + #{ + clientid := ClientId, + username := Username, + is_anonymous := IsAnonymous, + is_superuser := IsSuperuser + } = Fields, + ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), + ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])), + ?assert(erlang:is_boolean(IsAnonymous)), + ?assert(erlang:is_boolean(IsSuperuser)). verify_peername(PeerName) -> case string:split(PeerName, ":") of diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl index f761197f0..6d62856f9 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl @@ -265,6 +265,21 @@ t_rule_test_smoke(_Config) -> <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> } }, + #{ + expected => #{code => 412}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"client_check_authn_complete">>, + <<"is_superuser">> => true, + <<"is_anonymous">> => false, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + }, #{ expected => #{code => 412}, input => diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl index 5282e3e0e..ed8daf9c5 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl @@ -195,6 +195,28 @@ t_ctx_check_authz_complete(_) -> do_test(SQL, Context, Expected). +t_ctx_check_authn_complete(_) -> + SQL = + << + "SELECT clientid, username, is_superuser, is_anonymous\n" + "FROM \"$events/client_check_authn_complete\"" + >>, + + Context = + #{ + clientid => <<"c_emqx">>, + event_type => client_check_authn_complete, + is_superuser => true, + is_anonymous => false + }, + Expected = check_result( + [clientid, username, is_superuser, is_anonymous], + [], + Context + ), + + do_test(SQL, Context, Expected). + t_ctx_delivery_dropped(_) -> SQL = <<"SELECT from_clientid, from_username, reason, topic, qos FROM \"$events/delivery_dropped\"">>, diff --git a/rel/i18n/emqx_rule_api_schema.hocon b/rel/i18n/emqx_rule_api_schema.hocon index 668e2581a..53b15f8c6 100644 --- a/rel/i18n/emqx_rule_api_schema.hocon +++ b/rel/i18n/emqx_rule_api_schema.hocon @@ -390,4 +390,10 @@ event_ctx_disconnected_reason.desc: event_ctx_disconnected_reason.label: """Disconnect Reason""" +event_is_anonymous.desc: +"""True if this user is anonymous.""" + +event_is_superuser.desc: +"""True if this is a super user.""" + } From 92d26ff27b0f4fbe1a37fed6daaa5cc3087b1b9a Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 7 May 2024 16:52:37 +0800 Subject: [PATCH 16/24] chore: update change --- apps/emqx_rule_engine/src/emqx_rule_api_schema.erl | 2 +- apps/emqx_rule_engine/src/emqx_rule_events.erl | 7 ++++--- changes/ce/feat-12983.en.md | 1 + 3 files changed, 6 insertions(+), 4 deletions(-) create mode 100644 changes/ce/feat-12983.en.md diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 117340549..2eac6644c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -289,7 +289,7 @@ fields("ctx_check_authn_complete") -> {"event", event_sc(Event)}, {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, {"username", sc(binary(), #{desc => ?DESC("event_username")})}, - {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})}, + {"peername", sc(binary(), #{desc => ?DESC("event_peername")})}, {"is_anonymous", sc(boolean(), #{desc => ?DESC("event_is_anonymous")})}, {"is_superuser", sc(boolean(), #{desc => ?DESC("event_is_superuser")})} ]; diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 937132c10..58a0305c1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -455,7 +455,8 @@ eventmsg_check_authn_complete( _ClientInfo = #{ clientid := ClientId, username := Username, - peerhost := PeerHost + peerhost := PeerHost, + peerport := PeerPort }, #{is_anonymous := IsAnonymous} = Result ) -> @@ -465,7 +466,7 @@ eventmsg_check_authn_complete( #{ clientid => ClientId, username => Username, - peerhost => ntoa(PeerHost), + peername => ntoa({PeerHost, PeerPort}), is_anonymous => IsAnonymous, is_superuser => IsSuperuser }, @@ -1077,7 +1078,7 @@ columns_with_exam('client.check_authn_complete') -> {<<"event">>, 'client.check_authz_complete'}, {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, - {<<"peerhost">>, <<"192.168.0.10">>}, + {<<"peername">>, <<"192.168.0.10:56431">>}, {<<"is_superuser">>, true}, {<<"is_anonymous">>, false}, {<<"timestamp">>, erlang:system_time(millisecond)}, diff --git a/changes/ce/feat-12983.en.md b/changes/ce/feat-12983.en.md new file mode 100644 index 000000000..b531bfa89 --- /dev/null +++ b/changes/ce/feat-12983.en.md @@ -0,0 +1 @@ +Add new rule engine event `$events/client_check_authn_complete` for authentication completion event. From 87384cae32cddb7e394894d0e9200c15fcd9f4ad Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 9 May 2024 18:15:32 +0800 Subject: [PATCH 17/24] fix(events): call `client.check_authn_complete` even if authentication fails --- apps/emqx/src/emqx_access_control.erl | 27 +++++++++++++++---- .../src/emqx_rule_api_schema.erl | 5 ++-- .../emqx_rule_engine/src/emqx_rule_events.erl | 18 +++++++++++-- .../test/emqx_rule_engine_api_2_SUITE.erl | 1 + .../emqx_rule_engine_api_rule_test_SUITE.erl | 1 + rel/i18n/emqx_rule_api_schema.hocon | 3 +++ 6 files changed, 46 insertions(+), 9 deletions(-) diff --git a/apps/emqx/src/emqx_access_control.erl b/apps/emqx/src/emqx_access_control.erl index 9d2cbfe90..e3c730cd5 100644 --- a/apps/emqx/src/emqx_access_control.erl +++ b/apps/emqx/src/emqx_access_control.erl @@ -72,15 +72,15 @@ authenticate(Credential) -> {ok, AuthResult, _AuthData} = OkResult -> on_authentication_complete(Credential, AuthResult, ok), OkResult; - {error, _Reason} = Error -> - inc_authn_metrics(error), + {error, Reason} = Error -> + on_authentication_complete(Credential, Reason, error), Error; %% {continue, AuthCache} | {continue, AuthData, AuthCache} Other -> Other end; - {error, _Reason} = Error -> - inc_authn_metrics(error), + {error, Reason} = Error -> + on_authentication_complete(Credential, Reason, error), Error end. @@ -241,9 +241,26 @@ inc_authn_metrics(anonymous) -> emqx_metrics:inc('authentication.success.anonymous'), emqx_metrics:inc('authentication.success'). +on_authentication_complete(Credential, Reason, error) -> + emqx_hooks:run( + 'client.check_authn_complete', + [ + Credential, + #{ + reason_code => Reason + } + ] + ), + inc_authn_metrics(error); on_authentication_complete(Credential, Result, Type) -> emqx_hooks:run( 'client.check_authn_complete', - [Credential, Result#{is_anonymous => (Type =:= anonymous)}] + [ + Credential, + Result#{ + reason_code => success, + is_anonymous => (Type =:= anonymous) + } + ] ), inc_authn_metrics(Type). diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 2eac6644c..2450253c1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -289,9 +289,10 @@ fields("ctx_check_authn_complete") -> {"event", event_sc(Event)}, {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, {"username", sc(binary(), #{desc => ?DESC("event_username")})}, + {"reason_code", sc(binary(), #{desc => ?DESC("event_ctx_authn_reason_code")})}, {"peername", sc(binary(), #{desc => ?DESC("event_peername")})}, - {"is_anonymous", sc(boolean(), #{desc => ?DESC("event_is_anonymous")})}, - {"is_superuser", sc(boolean(), #{desc => ?DESC("event_is_superuser")})} + {"is_anonymous", sc(boolean(), #{desc => ?DESC("event_is_anonymous"), required => false})}, + {"is_superuser", sc(boolean(), #{desc => ?DESC("event_is_superuser"), required => false})} ]; fields("ctx_bridge_mqtt") -> Event = '$bridges/mqtt:*', diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 58a0305c1..4f0214a9d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -458,15 +458,22 @@ eventmsg_check_authn_complete( peerhost := PeerHost, peerport := PeerPort }, - #{is_anonymous := IsAnonymous} = Result + Result ) -> - IsSuperuser = maps:get(is_superuser, Result, false), + #{ + reason_code := Reason, + is_superuser := IsSuperuser, + is_anonymous := IsAnonymous + } = maps:merge( + #{is_anonymous => false, is_superuser => false}, Result + ), with_basic_columns( 'client.check_authn_complete', #{ clientid => ClientId, username => Username, peername => ntoa({PeerHost, PeerPort}), + reason_code => force_to_bin(Reason), is_anonymous => IsAnonymous, is_superuser => IsSuperuser }, @@ -901,6 +908,7 @@ test_columns('client.check_authn_complete') -> [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}, {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}, + {<<"reason_code">>, [<<"sucess">>, <<"the reason code">>]}, {<<"is_superuser">>, [true, <<"Whether this is a superuser">>]}, {<<"is_anonymous">>, [false, <<"Whether this is a superuser">>]} ]; @@ -1079,6 +1087,7 @@ columns_with_exam('client.check_authn_complete') -> {<<"clientid">>, <<"c_emqx">>}, {<<"username">>, <<"u_emqx">>}, {<<"peername">>, <<"192.168.0.10:56431">>}, + {<<"reason_code">>, <<"sucess">>}, {<<"is_superuser">>, true}, {<<"is_anonymous">>, false}, {<<"timestamp">>, erlang:system_time(millisecond)}, @@ -1201,6 +1210,11 @@ reason({shutdown, Reason}) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; reason(_) -> internal_error. +force_to_bin(Bin) when is_binary(Bin) -> + Bin; +force_to_bin(Term) -> + emqx_utils_conv:bin(io_lib:format("~p", [Term])). + ntoa(undefined) -> undefined; ntoa(IpOrIpPort) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl index 6d62856f9..7ebb673a8 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl @@ -273,6 +273,7 @@ t_rule_test_smoke(_Config) -> #{ <<"clientid">> => <<"c_emqx">>, <<"event_type">> => <<"client_check_authn_complete">>, + <<"reason_code">> => <<"sucess">>, <<"is_superuser">> => true, <<"is_anonymous">> => false, <<"username">> => <<"u_emqx">> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl index ed8daf9c5..3d3dabae0 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl @@ -206,6 +206,7 @@ t_ctx_check_authn_complete(_) -> #{ clientid => <<"c_emqx">>, event_type => client_check_authn_complete, + reason_code => <<"sucess">>, is_superuser => true, is_anonymous => false }, diff --git a/rel/i18n/emqx_rule_api_schema.hocon b/rel/i18n/emqx_rule_api_schema.hocon index 53b15f8c6..18d0990a2 100644 --- a/rel/i18n/emqx_rule_api_schema.hocon +++ b/rel/i18n/emqx_rule_api_schema.hocon @@ -396,4 +396,7 @@ event_is_anonymous.desc: event_is_superuser.desc: """True if this is a super user.""" +event_ctx_authn_reason_code.desc: +"""The reason code""" + } From c07bc68e6fb8f486267eb33ffd4409801c40aedc Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 3 Jun 2024 14:54:44 -0300 Subject: [PATCH 18/24] feat(postgres): add `disable_prepared_statements` option Fixes https://emqx.atlassian.net/browse/EMQX-12496 Some Postgres connections, such ones made to [PGBouncer](https://www.pgbouncer.org/) or [Supabase in Transaction Mode](https://supabase.com/), do not support some session features like prepared statements. --- .../test/emqx_authn_postgresql_SUITE.erl | 1 + .../test/emqx_authz_postgresql_SUITE.erl | 1 + .../src/emqx_bridge_matrix.app.src | 2 +- .../src/emqx_bridge_matrix_action_info.erl | 9 +- .../src/emqx_bridge_pgsql.app.src | 2 +- .../src/emqx_bridge_pgsql.erl | 8 +- .../src/emqx_bridge_pgsql_action_info.erl | 20 ++++- .../test/emqx_bridge_v2_pgsql_SUITE.erl | 57 +++++++++++-- .../src/emqx_bridge_timescale.app.src | 2 +- .../src/emqx_bridge_timescale_action_info.erl | 9 +- .../src/emqx_postgresql.app.src | 2 +- apps/emqx_postgresql/src/emqx_postgresql.erl | 85 ++++++++++++++----- .../emqx_postgresql_connector_schema.erl | 5 +- changes/ce/feat-13175.en.md | 3 + rel/i18n/emqx_postgresql.hocon | 9 ++ scripts/spellcheck/dicts/emqx.txt | 2 + 16 files changed, 179 insertions(+), 38 deletions(-) create mode 100644 changes/ce/feat-13175.en.md diff --git a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl index 3cf991d77..4f01ab34a 100644 --- a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl +++ b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl @@ -606,6 +606,7 @@ pgsql_server() -> pgsql_config() -> #{ auto_reconnect => true, + disable_prepared_statements => false, database => <<"mqtt">>, username => <<"root">>, password => <<"public">>, diff --git a/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl index 84b859e62..9365604fc 100644 --- a/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl +++ b/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl @@ -426,6 +426,7 @@ setup_config(SpecialParams) -> pgsql_config() -> #{ auto_reconnect => true, + disable_prepared_statements => false, database => <<"mqtt">>, username => <<"root">>, password => <<"public">>, diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src index 80864ba67..c7f2c2bf8 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_matrix, [ {description, "EMQX Enterprise MatrixDB Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl index 61a860db9..25b6e7699 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl @@ -10,7 +10,8 @@ bridge_v1_type_name/0, action_type_name/0, connector_type_name/0, - schema_module/0 + schema_module/0, + connector_action_config_to_bridge_v1_config/2 ]). bridge_v1_type_name() -> matrix. @@ -20,3 +21,9 @@ action_type_name() -> matrix. connector_type_name() -> matrix. schema_module() -> emqx_bridge_matrix. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + emqx_bridge_pgsql_action_info:connector_action_config_to_bridge_v1_config( + ConnectorConfig, + ActionConfig + ). diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src index d4f97a721..d223a2488 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pgsql, [ {description, "EMQX Enterprise PostgreSQL Bridge"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index 5a0b9eb5b..d26b37e96 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -82,6 +82,7 @@ fields("get_bridge_v2") -> fields("post_bridge_v2") -> fields("post", pgsql, pgsql_action); fields("config") -> + %% Bridge v1 config for all postgres-based bridges (pgsql, matrix, timescale) [ {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {sql, @@ -95,8 +96,11 @@ fields("config") -> #{desc => ?DESC("local_topic"), default => undefined} )} ] ++ emqx_resource_schema:fields("resource_opts") ++ - (emqx_postgresql:fields(config) -- - emqx_connector_schema_lib:prepare_statement_fields()); + proplists:delete( + disable_prepared_statements, + emqx_postgresql:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields() + ); fields("post") -> fields("post", ?ACTION_TYPE, "config"); fields("put") -> diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl index 55d6d156b..0d10bbbcc 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl @@ -10,7 +10,8 @@ bridge_v1_type_name/0, action_type_name/0, connector_type_name/0, - schema_module/0 + schema_module/0, + connector_action_config_to_bridge_v1_config/2 ]). bridge_v1_type_name() -> pgsql. @@ -20,3 +21,20 @@ action_type_name() -> pgsql. connector_type_name() -> pgsql. schema_module() -> emqx_bridge_pgsql. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + Config0 = emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorConfig, + ActionConfig + ), + maps:with(bridge_v1_fields(), Config0). + +%%------------------------------------------------------------------------------------------ +%% Internal helper functions +%%------------------------------------------------------------------------------------------ + +bridge_v1_fields() -> + [ + emqx_utils_conv:bin(K) + || {K, _V} <- emqx_bridge_pgsql:fields("config") + ]. diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl index 9dff5ab22..7e503cebb 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(BRIDGE_TYPE, pgsql). -define(BRIDGE_TYPE_BIN, <<"pgsql">>). @@ -33,7 +34,18 @@ %%------------------------------------------------------------------------------ all() -> - emqx_common_test_helpers:all(?MODULE). + All0 = emqx_common_test_helpers:all(?MODULE), + All = All0 -- matrix_cases(), + Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()), + Groups ++ All. + +matrix_cases() -> + [ + t_disable_prepared_statements + ]. + +groups() -> + emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()). init_per_suite(Config) -> PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"), @@ -80,10 +92,26 @@ end_per_suite(Config) -> emqx_cth_suite:stop(Apps), ok. -init_per_testcase(TestCase, Config) -> - common_init_per_testcase(TestCase, Config). +init_per_group(Group, Config) when + Group =:= postgres; + Group =:= timescale; + Group =:= matrix +-> + [ + {bridge_type, group_to_type(Group)}, + {connector_type, group_to_type(Group)} + | Config + ]; +init_per_group(_Group, Config) -> + Config. -common_init_per_testcase(TestCase, Config) -> +group_to_type(postgres) -> pgsql; +group_to_type(Group) -> Group. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(TestCase, Config) -> ct:timetrap(timer:seconds(60)), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), emqx_config:delete_override_conf_files(), @@ -103,10 +131,10 @@ common_init_per_testcase(TestCase, Config) -> BridgeConfig = bridge_config(Name, Name), ok = snabbkaffe:start_trace(), [ - {connector_type, ?CONNECTOR_TYPE}, + {connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)}, {connector_name, Name}, {connector_config, ConnectorConfig}, - {bridge_type, ?BRIDGE_TYPE}, + {bridge_type, proplists:get_value(bridge_type, Config, ?BRIDGE_TYPE)}, {bridge_name, Name}, {bridge_config, BridgeConfig} | NConfig @@ -232,3 +260,20 @@ t_sync_query(Config) -> t_start_action_or_source_with_disabled_connector(Config) -> ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config), ok. + +t_disable_prepared_statements(matrix) -> + [[postgres], [timescale], [matrix]]; +t_disable_prepared_statements(Config0) -> + ConnectorConfig0 = ?config(connector_config, Config0), + ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}), + Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}), + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, + fun make_message/0, + fun(Res) -> ?assertMatch({ok, _}, Res) end, + postgres_bridge_connector_on_query_return + ), + ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + ok = emqx_bridge_v2_testlib:t_create_via_http(Config), + ok. diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src index 477da7c9e..914354cad 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_timescale, [ {description, "EMQX Enterprise TimescaleDB Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource]}, {env, [ diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl index 04335b91a..1ebb71d7a 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl @@ -10,7 +10,8 @@ bridge_v1_type_name/0, action_type_name/0, connector_type_name/0, - schema_module/0 + schema_module/0, + connector_action_config_to_bridge_v1_config/2 ]). bridge_v1_type_name() -> timescale. @@ -20,3 +21,9 @@ action_type_name() -> timescale. connector_type_name() -> timescale. schema_module() -> emqx_bridge_timescale. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + emqx_bridge_pgsql_action_info:connector_action_config_to_bridge_v1_config( + ConnectorConfig, + ActionConfig + ). diff --git a/apps/emqx_postgresql/src/emqx_postgresql.app.src b/apps/emqx_postgresql/src/emqx_postgresql.app.src index 5faf0aa47..2cf3392bf 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.app.src +++ b/apps/emqx_postgresql/src/emqx_postgresql.app.src @@ -1,6 +1,6 @@ {application, emqx_postgresql, [ {description, "EMQX PostgreSQL Database Connector"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index ad674a07c..7fe564dc3 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -50,6 +50,8 @@ execute_batch/3 ]). +-export([disable_prepared_statements/0]). + %% for ecpool workers usage -export([do_get_status/1, prepare_sql_to_conn/2]). @@ -62,7 +64,7 @@ #{ pool_name := binary(), query_templates := #{binary() => template()}, - prepares := #{binary() => epgsql:statement()} | {error, _} + prepares := disabled | #{binary() => epgsql:statement()} | {error, _} }. %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch' @@ -78,7 +80,10 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. fields(config) -> - [{server, server()}] ++ + [ + {server, server()}, + disable_prepared_statements() + ] ++ adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ emqx_connector_schema_lib:ssl_fields() ++ emqx_connector_schema_lib:prepare_statement_fields(). @@ -87,6 +92,17 @@ server() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS). +disable_prepared_statements() -> + {disable_prepared_statements, + hoconsc:mk( + boolean(), + #{ + default => false, + required => false, + desc => ?DESC("disable_prepared_statements") + } + )}. + adjust_fields(Fields) -> lists:map( fun @@ -108,6 +124,7 @@ on_start( InstId, #{ server := Server, + disable_prepared_statements := DisablePreparedStatements, database := DB, username := User, pool_size := PoolSize, @@ -143,11 +160,16 @@ on_start( {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {pool_size, PoolSize} ], - State1 = parse_prepare_sql(Config, <<"send_message">>), + State1 = parse_sql_template(Config, <<"send_message">>), State2 = State1#{installed_channels => #{}}, case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> - {ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})}; + Prepares = + case DisablePreparedStatements of + true -> disabled; + false -> #{} + end, + {ok, init_prepare(State2#{pool_name => InstId, prepares => Prepares})}; {error, Reason} -> ?tp( pgsql_connector_start_failed, @@ -209,13 +231,17 @@ on_add_channel( create_channel_state( ChannelId, - #{pool_name := PoolName} = _ConnectorState, + #{ + pool_name := PoolName, + prepares := Prepares + } = _ConnectorState, #{parameters := Parameters} = _ChannelConfig ) -> - State1 = parse_prepare_sql(Parameters, ChannelId), + State1 = parse_sql_template(Parameters, ChannelId), {ok, init_prepare(State1#{ pool_name => PoolName, + prepares => Prepares, prepare_statement => #{} })}. @@ -233,6 +259,8 @@ on_remove_channel( NewState = OldState#{installed_channels => NewInstalledChannels}, {ok, NewState}. +close_prepared_statement(_ChannelId, #{prepares := disabled}) -> + ok; close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) -> WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], close_prepared_statement(WorkerPids, ChannelId, State), @@ -243,7 +271,7 @@ close_prepared_statement([WorkerPid | Rest], ChannelId, State) -> %% prepared statement doesn't exist. try ecpool_worker:client(WorkerPid) of {ok, Conn} -> - Statement = get_prepared_statement(ChannelId, State), + Statement = get_templated_statement(ChannelId, State), _ = epgsql:close(Conn, Statement), close_prepared_statement(Rest, ChannelId, State); _ -> @@ -303,21 +331,23 @@ on_query( sql => NameOrSQL, state => State }), - Type = pgsql_query_type(TypeOrKey), + Type = pgsql_query_type(TypeOrKey, State), {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data), ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}), handle_result(Res). -pgsql_query_type(sql) -> +pgsql_query_type(_TypeOrTag, #{prepares := disabled}) -> query; -pgsql_query_type(query) -> +pgsql_query_type(sql, _ConnectorState) -> query; -pgsql_query_type(prepared_query) -> +pgsql_query_type(query, _ConnectorState) -> + query; +pgsql_query_type(prepared_query, _ConnectorState) -> prepared_query; %% for bridge -pgsql_query_type(_) -> - pgsql_query_type(prepared_query). +pgsql_query_type(_, ConnectorState) -> + pgsql_query_type(prepared_query, ConnectorState). on_batch_query( InstId, @@ -336,9 +366,9 @@ on_batch_query( ?SLOG(error, Log), {error, {unrecoverable_error, batch_prepare_not_implemented}}; {_Statement, RowTemplate} -> - PrepStatement = get_prepared_statement(BinKey, State), + StatementTemplate = get_templated_statement(BinKey, State), Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq], - case on_sql_query(Key, InstId, PoolName, execute_batch, PrepStatement, Rows) of + case on_sql_query(Key, InstId, PoolName, execute_batch, StatementTemplate, Rows) of {error, _Error} = Result -> handle_result(Result); {_Column, Results} -> @@ -359,12 +389,19 @@ proc_sql_params(query, SQLOrKey, Params, _State) -> proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; proc_sql_params(TypeOrKey, SQLOrData, Params, State) -> + DisablePreparedStatements = maps:get(prepares, State, #{}) =:= disabled, BinKey = to_bin(TypeOrKey), case get_template(BinKey, State) of undefined -> {SQLOrData, Params}; - {_Statement, RowTemplate} -> - {BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)} + {Statement, RowTemplate} -> + Rendered = render_prepare_sql_row(RowTemplate, SQLOrData), + case DisablePreparedStatements of + true -> + {Statement, Rendered}; + false -> + {BinKey, Rendered} + end end. get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) -> @@ -376,14 +413,14 @@ get_template(Key, #{query_templates := Templates}) -> BinKey = to_bin(Key), maps:get(BinKey, Templates, undefined). -get_prepared_statement(Key, #{installed_channels := Channels} = _State) when +get_templated_statement(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) -> BinKey = to_bin(Key), ChannelState = maps:get(BinKey, Channels), ChannelPreparedStatements = maps:get(prepares, ChannelState), maps:get(BinKey, ChannelPreparedStatements); -get_prepared_statement(Key, #{prepares := PrepStatements}) -> +get_templated_statement(Key, #{prepares := PrepStatements}) -> BinKey = to_bin(Key), maps:get(BinKey, PrepStatements). @@ -480,6 +517,8 @@ do_check_prepares( {error, Reason} -> {error, Reason} end; +do_check_prepares(#{prepares := disabled}) -> + ok; do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) -> ok; do_check_prepares(#{prepares := {error, _}} = State) -> @@ -579,7 +618,7 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) -> conn_opts([_Opt | Opts], Acc) -> conn_opts(Opts, Acc). -parse_prepare_sql(Config, SQLID) -> +parse_sql_template(Config, SQLID) -> Queries = case Config of #{prepare_statement := Qs} -> @@ -589,10 +628,10 @@ parse_prepare_sql(Config, SQLID) -> #{} -> #{} end, - Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries), + Templates = maps:fold(fun parse_sql_template/3, #{}, Queries), #{query_templates => Templates}. -parse_prepare_sql(Key, Query, Acc) -> +parse_sql_template(Key, Query, Acc) -> Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}), Acc#{Key => Template}. @@ -601,6 +640,8 @@ render_prepare_sql_row(RowTemplate, Data) -> {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}), Row. +init_prepare(State = #{prepares := disabled}) -> + State; init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 -> State; init_prepare(State = #{}) -> diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl index fe5c4cd78..515fbe4b0 100644 --- a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -47,7 +47,10 @@ roots() -> []. fields("connection_fields") -> - [{server, server()}] ++ + [ + {server, server()}, + emqx_postgresql:disable_prepared_statements() + ] ++ adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ emqx_connector_schema_lib:ssl_fields(); fields("config_connector") -> diff --git a/changes/ce/feat-13175.en.md b/changes/ce/feat-13175.en.md new file mode 100644 index 000000000..f673c1352 --- /dev/null +++ b/changes/ce/feat-13175.en.md @@ -0,0 +1,3 @@ +Added the `disable_prepared_statements` option for Postgres-based connectors. + +This option is to be used with endpoints that do not support the prepared statements session feature, such as PGBouncer and Supabase in Transaction mode. diff --git a/rel/i18n/emqx_postgresql.hocon b/rel/i18n/emqx_postgresql.hocon index 9740b0814..159a9a727 100644 --- a/rel/i18n/emqx_postgresql.hocon +++ b/rel/i18n/emqx_postgresql.hocon @@ -14,4 +14,13 @@ config_connector.desc: config_connector.label: """PostgreSQL Connector Config""" +disable_prepared_statements.label: +"""Disable Prepared Statements""" +disable_prepared_statements.desc: +"""~ +Disables the usage of prepared statements in the connections. +Some endpoints, like PGBouncer or Supabase in Transaction mode, do not +support session features such as prepared statements. For such connections, +this option should be enabled.~""" + } diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index af5a34531..7c888af49 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -49,6 +49,7 @@ NIF OCSP OTP PEM +PGBouncer PINGREQ PSK PSK @@ -65,6 +66,7 @@ Riak SHA SMS Struct +Supabase TCP TLS TTL From 3013189cd7c1ad04971b03cd7d785fc3671d0aa0 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 4 Jun 2024 11:38:24 -0300 Subject: [PATCH 19/24] fix(resource manager): force kill process if stuck when stopping/removing Fixes https://emqx.atlassian.net/browse/EMQX-12357 --- apps/emqx_resource/src/emqx_resource.erl | 1 + .../src/emqx_resource_manager.erl | 50 +++++++++++++- .../test/emqx_connector_demo.erl | 13 ++++ .../test/emqx_resource_SUITE.erl | 37 +++++++++++ apps/emqx_utils/test/emqx_utils_agent.erl | 66 +++++++++++++++++++ 5 files changed, 164 insertions(+), 3 deletions(-) create mode 100644 apps/emqx_utils/test/emqx_utils_agent.erl diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 8340bf589..9bd6b1390 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -85,6 +85,7 @@ get_allocated_resources_list/1, forget_allocated_resources/1, deallocate_resource/2, + clean_allocated_resources/2, %% Get channel config from resource call_get_channel_config/3, % Call the format query result function diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index a09a6e9f8..4763094d0 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -63,6 +63,10 @@ %% Internal exports. -export([worker_resource_health_check/1, worker_channel_health_check/2]). +-ifdef(TEST). +-export([stop/2]). +-endif. + % State record -record(data, { id, @@ -254,7 +258,17 @@ remove(ResId) when is_binary(ResId) -> -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. remove(ResId, ClearMetrics) when is_binary(ResId) -> try - safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) + case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of + {error, timeout} -> + ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{ + action => remove, + resource_id => ResId + }), + force_kill(ResId), + ok; + Res -> + Res + end after %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process %% If the 'remove' call above had succeeded, this is mostly a no-op but still needed to avoid race condition. @@ -287,9 +301,20 @@ start(ResId, Opts) -> %% @doc Stop the resource -spec stop(resource_id()) -> ok | {error, Reason :: term()}. stop(ResId) -> - case safe_call(ResId, stop, ?T_OPERATION) of + stop(ResId, ?T_OPERATION). + +-spec stop(resource_id(), timeout()) -> ok | {error, Reason :: term()}. +stop(ResId, Timeout) -> + case safe_call(ResId, stop, Timeout) of ok -> ok; + {error, timeout} -> + ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{ + action => stop, + resource_id => ResId + }), + force_kill(ResId), + ok; {error, _Reason} = Error -> Error end. @@ -406,6 +431,25 @@ get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when get_error(_ResId, #{error := Error}) -> Error. +force_kill(ResId) -> + case gproc:whereis_name(?NAME(ResId)) of + undefined -> + ok; + Pid when is_pid(Pid) -> + exit(Pid, kill), + try_clean_allocated_resources(ResId), + ok + end. + +try_clean_allocated_resources(ResId) -> + case try_read_cache(ResId) of + #data{mod = Mod} -> + catch emqx_resource:clean_allocated_resources(ResId, Mod), + ok; + _ -> + ok + end. + %% Server start/stop callbacks %% @doc Function called from the supervisor to actually start the server @@ -737,7 +781,7 @@ maybe_stop_resource(#data{status = ?rm_status_stopped} = Data) -> Data. stop_resource(#data{state = ResState, id = ResId} = Data) -> - %% We don't care the return value of the Mod:on_stop/2. + %% We don't care about the return value of `Mod:on_stop/2'. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. HasAllocatedResources = emqx_resource:has_allocated_resources(ResId), diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 754727e8c..0fc11cc66 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -71,6 +71,16 @@ set_callback_mode(Mode) -> on_start(_InstId, #{create_error := true}) -> ?tp(connector_demo_start_error, #{}), error("some error"); +on_start(InstId, #{create_error := {delay, Delay, Agent}} = Opts) -> + ?tp(connector_demo_start_delay, #{}), + case emqx_utils_agent:get_and_update(Agent, fun(St) -> {St, called} end) of + not_called -> + emqx_resource:allocate_resource(InstId, i_should_be_deallocated, yep), + timer:sleep(Delay), + on_start(InstId, maps:remove(create_error, Opts)); + called -> + on_start(InstId, maps:remove(create_error, Opts)) + end; on_start(InstId, #{name := Name} = Opts) -> Register = maps:get(register, Opts, false), StopError = maps:get(stop_error, Opts, false), @@ -81,6 +91,9 @@ on_start(InstId, #{name := Name} = Opts) -> pid => spawn_counter_process(Name, Register) }}. +on_stop(_InstId, undefined) -> + ?tp(connector_demo_free_resources_without_state, #{}), + ok; on_stop(_InstId, #{stop_error := true}) -> {error, stop_error}; on_stop(InstId, #{pid := Pid}) -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 05a2f711d..764c65e6f 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -3189,6 +3189,43 @@ t_non_blocking_channel_health_check(_Config) -> ), ok. +%% Test that `stop' forcefully stops the resource manager even if it's stuck on a sync +%% call such as `on_start', and that the claimed resources, if any, are freed. +t_force_stop(_Config) -> + ?check_trace( + begin + {ok, Agent} = emqx_utils_agent:start_link(not_called), + {ok, _} = + create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{ + name => test_resource, + create_error => {delay, 30_000, Agent} + }, + #{ + health_check_interval => 100, + start_timeout => 100 + } + ), + ?assertEqual(ok, emqx_resource_manager:stop(?ID, _Timeout = 100)), + ok + end, + [ + log_consistency_prop(), + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(connector_demo_start_delay, Trace)), + ?assertMatch( + [_ | _], ?of_kind("forcefully_stopping_resource_due_to_timeout", Trace) + ), + ?assertMatch([_ | _], ?of_kind(connector_demo_free_resources_without_state, Trace)), + ok + end + ] + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/apps/emqx_utils/test/emqx_utils_agent.erl b/apps/emqx_utils/test/emqx_utils_agent.erl new file mode 100644 index 000000000..4280a491f --- /dev/null +++ b/apps/emqx_utils/test/emqx_utils_agent.erl @@ -0,0 +1,66 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Similar to Elixir's [`Agent'](https://hexdocs.pm/elixir/Agent.html). + +-module(emqx_utils_agent). + +%% API +-export([start_link/1, get/1, get_and_update/2]). + +%% `gen_server' API +-export([init/1, handle_call/3]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +-type state() :: term(). + +-type get_and_update_fn() :: fun((state()) -> {term(), state()}). + +-record(get_and_update, {fn :: get_and_update_fn()}). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +-spec start_link(state()) -> gen_server:start_ret(). +start_link(InitState) -> + gen_server:start_link(?MODULE, InitState, []). + +-spec get(gen_server:server_ref()) -> term(). +get(ServerRef) -> + Fn = fun(St) -> {St, St} end, + gen_server:call(ServerRef, #get_and_update{fn = Fn}). + +-spec get_and_update(gen_server:server_ref(), get_and_update_fn()) -> term(). +get_and_update(ServerRef, Fn) -> + gen_server:call(ServerRef, #get_and_update{fn = Fn}). + +%%------------------------------------------------------------------------------ +%% `gen_server' API +%%------------------------------------------------------------------------------ + +init(InitState) -> + {ok, InitState}. + +handle_call(#get_and_update{fn = Fn}, _From, State0) -> + {Reply, State} = Fn(State0), + {reply, Reply, State}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ From f17aefe3d7746da91c917907536a7f6ae2b52317 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 4 Jun 2024 11:59:00 -0300 Subject: [PATCH 20/24] fix(action/source api): improve returned error message on timeout --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 13 ++++++++++++- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 2 ++ changes/ce/fix-13181.en.md | 3 +++ 3 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 changes/ce/fix-13181.en.md diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index e6feac7bd..79e8fc8f8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1151,7 +1151,18 @@ post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, undefin post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) when ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES -> - ok = uninstall_bridge_v2(ConfRootKey, BridgeType, BridgeName, OldConf), + case uninstall_bridge_v2(ConfRootKey, BridgeType, BridgeName, OldConf) of + ok -> + ok; + {error, timeout} -> + throw(<< + "Timed out trying to remove action or source. Please try again and," + " if the error persists, try disabling the connector before retrying." + >>); + {error, not_found} -> + %% Should not happen, unless config is inconsistent. + throw(<<"Referenced connector not found">>) + end, ok = install_bridge_v2(ConfRootKey, BridgeType, BridgeName, NewConf), Bridges = emqx_utils_maps:deep_put( [BridgeType, BridgeName], emqx:get_config([ConfRootKey]), NewConf diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 8ba2ef487..99caba625 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -879,6 +879,8 @@ handle_disable_enable(ConfRootKey, Id, Enable) -> ?SERVICE_UNAVAILABLE(<<"request timeout">>); {error, timeout} -> ?SERVICE_UNAVAILABLE(<<"request timeout">>); + {error, Reason} when is_binary(Reason) -> + ?BAD_REQUEST(Reason); {error, Reason} -> ?INTERNAL_ERROR(Reason) end diff --git a/changes/ce/fix-13181.en.md b/changes/ce/fix-13181.en.md new file mode 100644 index 000000000..984a9af76 --- /dev/null +++ b/changes/ce/fix-13181.en.md @@ -0,0 +1,3 @@ +Now, when attempting to stop a connector, if such operation times out, we forcefully shut down the connector process. + +Error messages when attempting to disable an action/source when its underlying connector is stuck were also improved. From 9e3c817e5b8d06d44d7c71d31cf75810354c2f4d Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 5 Jun 2024 15:45:16 +0200 Subject: [PATCH 21/24] ci: OTP 26 (26.2.5-1) for docker images --- .../docker-compose-kafka.yaml | 2 +- .ci/docker-compose-file/docker-compose.yaml | 2 +- .github/workflows/_pr_entrypoint.yaml | 16 ++++++++-------- .github/workflows/_push-entrypoint.yaml | 19 +++++++++---------- .../build_and_push_docker_images.yaml | 4 ++-- .github/workflows/build_packages.yaml | 4 ++-- .github/workflows/build_packages_cron.yaml | 6 +++--- .github/workflows/build_slim_packages.yaml | 12 ++++++------ .github/workflows/codeql.yaml | 2 +- .github/workflows/performance_test.yaml | 2 +- .github/workflows/run_relup_tests.yaml | 2 +- .tool-versions | 2 +- Makefile | 2 +- build | 4 ++-- deploy/docker/Dockerfile | 2 +- scripts/buildx.sh | 4 ++-- scripts/pr-sanity-checks.sh | 4 ++-- .../relup-test/start-relup-test-cluster.sh | 2 +- 18 files changed, 45 insertions(+), 46 deletions(-) diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml index 45dfc3fa0..48bd85ac1 100644 --- a/.ci/docker-compose-file/docker-compose-kafka.yaml +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -18,7 +18,7 @@ services: - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret kdc: hostname: kdc.emqx.net - image: ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04 + image: ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04 container_name: kdc.emqx.net expose: - 88 # kdc diff --git a/.ci/docker-compose-file/docker-compose.yaml b/.ci/docker-compose-file/docker-compose.yaml index ce98a7ced..212ff78ed 100644 --- a/.ci/docker-compose-file/docker-compose.yaml +++ b/.ci/docker-compose-file/docker-compose.yaml @@ -3,7 +3,7 @@ version: '3.9' services: erlang: container_name: erlang - image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04} + image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04} env_file: - credentials.env - conf.env diff --git a/.github/workflows/_pr_entrypoint.yaml b/.github/workflows/_pr_entrypoint.yaml index 3b8b1a0cd..9f480d220 100644 --- a/.github/workflows/_pr_entrypoint.yaml +++ b/.github/workflows/_pr_entrypoint.yaml @@ -17,16 +17,16 @@ env: jobs: sanity-checks: runs-on: ubuntu-22.04 - container: "ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04" + container: "ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04" outputs: ct-matrix: ${{ steps.matrix.outputs.ct-matrix }} ct-host: ${{ steps.matrix.outputs.ct-host }} ct-docker: ${{ steps.matrix.outputs.ct-docker }} version-emqx: ${{ steps.matrix.outputs.version-emqx }} version-emqx-enterprise: ${{ steps.matrix.outputs.version-emqx-enterprise }} - builder: "ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04" - builder_vsn: "5.3-5" - otp_vsn: "26.2.1-2" + builder: "ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04" + builder_vsn: "5.3-7" + otp_vsn: "26.2.5-1" elixir_vsn: "1.15.7" permissions: @@ -96,13 +96,13 @@ jobs: MATRIX="$(echo "${APPS}" | jq -c ' [ (.[] | select(.profile == "emqx") | . + { - builder: "5.3-5", - otp: "26.2.1-2", + builder: "5.3-7", + otp: "26.2.5-1", elixir: "1.15.7" }), (.[] | select(.profile == "emqx-enterprise") | . + { - builder: "5.3-5", - otp: ["26.2.1-2"][], + builder: "5.3-7", + otp: ["26.2.5-1"][], elixir: "1.15.7" }) ] diff --git a/.github/workflows/_push-entrypoint.yaml b/.github/workflows/_push-entrypoint.yaml index 7033ab989..9c79eb42e 100644 --- a/.github/workflows/_push-entrypoint.yaml +++ b/.github/workflows/_push-entrypoint.yaml @@ -24,7 +24,7 @@ env: jobs: prepare: runs-on: ubuntu-22.04 - container: 'ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04' + container: 'ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04' outputs: profile: ${{ steps.parse-git-ref.outputs.profile }} release: ${{ steps.parse-git-ref.outputs.release }} @@ -32,9 +32,9 @@ jobs: ct-matrix: ${{ steps.matrix.outputs.ct-matrix }} ct-host: ${{ steps.matrix.outputs.ct-host }} ct-docker: ${{ steps.matrix.outputs.ct-docker }} - builder: 'ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04' - builder_vsn: '5.3-5' - otp_vsn: '26.2.1-2' + builder: 'ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04' + builder_vsn: '5.3-7' + otp_vsn: '26.2.5-1' elixir_vsn: '1.15.7' permissions: @@ -66,13 +66,13 @@ jobs: MATRIX="$(echo "${APPS}" | jq -c ' [ (.[] | select(.profile == "emqx") | . + { - builder: "5.3-5", - otp: "26.2.1-2", + builder: "5.3-7", + otp: "26.2.5-1", elixir: "1.15.7" }), (.[] | select(.profile == "emqx-enterprise") | . + { - builder: "5.3-5", - otp: ["26.2.1-2"][], + builder: "5.3-7", + otp: ["26.2.5-1"][], elixir: "1.15.7" }) ] @@ -107,8 +107,7 @@ jobs: profile: ${{ needs.prepare.outputs.profile }} publish: true latest: ${{ needs.prepare.outputs.latest }} - # TODO: revert this back to needs.prepare.outputs.otp_vsn when OTP 26 bug is fixed - otp_vsn: 25.3.2-2 + otp_vsn: ${{ needs.prepare.outputs.otp_vsn }} elixir_vsn: ${{ needs.prepare.outputs.elixir_vsn }} builder_vsn: ${{ needs.prepare.outputs.builder_vsn }} secrets: inherit diff --git a/.github/workflows/build_and_push_docker_images.yaml b/.github/workflows/build_and_push_docker_images.yaml index 5c879dc11..f05e4e8fb 100644 --- a/.github/workflows/build_and_push_docker_images.yaml +++ b/.github/workflows/build_and_push_docker_images.yaml @@ -53,7 +53,7 @@ on: otp_vsn: required: false type: string - default: '25.3.2-2' + default: '26.2.5-1' elixir_vsn: required: false type: string @@ -61,7 +61,7 @@ on: builder_vsn: required: false type: string - default: '5.3-5' + default: '5.3-7' permissions: contents: read diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index ce8536dde..d64416b9b 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -55,7 +55,7 @@ on: otp_vsn: required: false type: string - default: '26.2.1-2' + default: '26.2.5-1' elixir_vsn: required: false type: string @@ -63,7 +63,7 @@ on: builder_vsn: required: false type: string - default: '5.3-5' + default: '5.3-7' permissions: contents: read diff --git a/.github/workflows/build_packages_cron.yaml b/.github/workflows/build_packages_cron.yaml index 9cec4244d..c33ecc12a 100644 --- a/.github/workflows/build_packages_cron.yaml +++ b/.github/workflows/build_packages_cron.yaml @@ -23,8 +23,8 @@ jobs: fail-fast: false matrix: profile: - - ['emqx', 'master', '5.3-5:1.15.7-26.2.1-2'] - - ['emqx', 'release-57', '5.3-5:1.15.7-26.2.1-2'] + - ['emqx', 'master', '5.3-7:1.15.7-26.2.5-1'] + - ['emqx', 'release-57', '5.3-7:1.15.7-26.2.5-1'] os: - ubuntu22.04 - amzn2023 @@ -92,7 +92,7 @@ jobs: branch: - master otp: - - 26.2.1-2 + - 26.2.5-1 os: - macos-12-arm64 diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index 7c8e3aaa4..cb7f53358 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -27,15 +27,15 @@ on: builder: required: false type: string - default: 'ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04' + default: 'ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04' builder_vsn: required: false type: string - default: '5.3-5' + default: '5.3-7' otp_vsn: required: false type: string - default: '26.2.1-2' + default: '26.2.5-1' elixir_vsn: required: false type: string @@ -54,9 +54,9 @@ jobs: fail-fast: false matrix: profile: - - ["emqx", "26.2.1-2", "ubuntu22.04", "elixir", "x64"] - - ["emqx", "26.2.1-2", "ubuntu22.04", "elixir", "arm64"] - - ["emqx-enterprise", "26.2.1-2", "ubuntu22.04", "erlang", "x64"] + - ["emqx", "26.2.5-1", "ubuntu22.04", "elixir", "x64"] + - ["emqx", "26.2.5-1", "ubuntu22.04", "elixir", "arm64"] + - ["emqx-enterprise", "26.2.5-1", "ubuntu22.04", "erlang", "x64"] container: "ghcr.io/emqx/emqx-builder/${{ inputs.builder_vsn }}:${{ inputs.elixir_vsn }}-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}" diff --git a/.github/workflows/codeql.yaml b/.github/workflows/codeql.yaml index 5bb2d29f0..774f0e344 100644 --- a/.github/workflows/codeql.yaml +++ b/.github/workflows/codeql.yaml @@ -18,7 +18,7 @@ jobs: actions: read security-events: write container: - image: ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04 + image: ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04 strategy: fail-fast: false diff --git a/.github/workflows/performance_test.yaml b/.github/workflows/performance_test.yaml index 413ac3728..54645ceb7 100644 --- a/.github/workflows/performance_test.yaml +++ b/.github/workflows/performance_test.yaml @@ -26,7 +26,7 @@ jobs: prepare: runs-on: ubuntu-latest if: github.repository_owner == 'emqx' - container: ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu20.04 + container: ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu20.04 outputs: BENCH_ID: ${{ steps.prepare.outputs.BENCH_ID }} PACKAGE_FILE: ${{ steps.package_file.outputs.PACKAGE_FILE }} diff --git a/.github/workflows/run_relup_tests.yaml b/.github/workflows/run_relup_tests.yaml index e0ead42a3..b4214771d 100644 --- a/.github/workflows/run_relup_tests.yaml +++ b/.github/workflows/run_relup_tests.yaml @@ -74,7 +74,7 @@ jobs: steps: - uses: erlef/setup-beam@2f0cc07b4b9bea248ae098aba9e1a8a1de5ec24c # v1.17.5 with: - otp-version: 26.2.1 + otp-version: 26.2.5 - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 with: repository: hawk/lux diff --git a/.tool-versions b/.tool-versions index b29f3ab4f..b9c0e8deb 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -erlang 26.2.1-2 +erlang 26.2.5-1 elixir 1.15.7-otp-26 diff --git a/Makefile b/Makefile index 48377475e..037a2b0df 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ REBAR = $(CURDIR)/rebar3 BUILD = $(CURDIR)/build SCRIPTS = $(CURDIR)/scripts export EMQX_RELUP ?= true -export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12 +export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12 export EMQX_DEFAULT_RUNNER = public.ecr.aws/debian/debian:12-slim export EMQX_REL_FORM ?= tgz export QUICER_DOWNLOAD_FROM_RELEASE = 1 diff --git a/build b/build index bf5c348f9..b429b7d62 100755 --- a/build +++ b/build @@ -397,9 +397,9 @@ function is_ecr_and_enterprise() { ## Build the default docker image based on debian 12. make_docker() { - local EMQX_BUILDER_VERSION="${EMQX_BUILDER_VERSION:-5.3-5}" + local EMQX_BUILDER_VERSION="${EMQX_BUILDER_VERSION:-5.3-7}" local EMQX_BUILDER_PLATFORM="${EMQX_BUILDER_PLATFORM:-debian12}" - local EMQX_BUILDER_OTP="${EMQX_BUILDER_OTP:-25.3.2-2}" + local EMQX_BUILDER_OTP="${EMQX_BUILDER_OTP:-26.2.5-1}" local EMQX_BUILDER_ELIXIR="${EMQX_BUILDER_ELIXIR:-1.15.7}" local EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${EMQX_BUILDER_ELIXIR}-${EMQX_BUILDER_OTP}-${EMQX_BUILDER_PLATFORM}} local EMQX_RUNNER="${EMQX_RUNNER:-${EMQX_DEFAULT_RUNNER}}" diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile index fcff6984e..9eb9b2518 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/docker/Dockerfile @@ -1,4 +1,4 @@ -ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12 +ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12 ARG RUN_FROM=public.ecr.aws/debian/debian:12-slim ARG SOURCE_TYPE=src # tgz diff --git a/scripts/buildx.sh b/scripts/buildx.sh index 3c358a934..c222127b3 100755 --- a/scripts/buildx.sh +++ b/scripts/buildx.sh @@ -9,7 +9,7 @@ ## example: ## ./scripts/buildx.sh --profile emqx --pkgtype tgz --arch arm64 \ -## --builder ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12 +## --builder ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12 set -euo pipefail @@ -24,7 +24,7 @@ help() { echo "--arch amd64|arm64: Target arch to build the EMQX package for" echo "--src_dir : EMQX source code in this dir, default to PWD" echo "--builder : Builder image to pull" - echo " E.g. ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12" + echo " E.g. ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12" } die() { diff --git a/scripts/pr-sanity-checks.sh b/scripts/pr-sanity-checks.sh index be02a337d..ed796d87a 100755 --- a/scripts/pr-sanity-checks.sh +++ b/scripts/pr-sanity-checks.sh @@ -12,8 +12,8 @@ if ! type "yq" > /dev/null; then exit 1 fi -EMQX_BUILDER_VERSION=${EMQX_BUILDER_VERSION:-5.3-5} -EMQX_BUILDER_OTP=${EMQX_BUILDER_OTP:-26.2.1-2} +EMQX_BUILDER_VERSION=${EMQX_BUILDER_VERSION:-5.3-7} +EMQX_BUILDER_OTP=${EMQX_BUILDER_OTP:-26.2.5-1} EMQX_BUILDER_ELIXIR=${EMQX_BUILDER_ELIXIR:-1.15.7} EMQX_BUILDER_PLATFORM=${EMQX_BUILDER_PLATFORM:-ubuntu22.04} EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${EMQX_BUILDER_ELIXIR}-${EMQX_BUILDER_OTP}-${EMQX_BUILDER_PLATFORM}} diff --git a/scripts/relup-test/start-relup-test-cluster.sh b/scripts/relup-test/start-relup-test-cluster.sh index 557fbbff9..a972fa210 100755 --- a/scripts/relup-test/start-relup-test-cluster.sh +++ b/scripts/relup-test/start-relup-test-cluster.sh @@ -22,7 +22,7 @@ WEBHOOK="webhook.$NET" BENCH="bench.$NET" COOKIE='this-is-a-secret' ## Erlang image is needed to run webhook server and emqtt-bench -ERLANG_IMAGE="ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04" +ERLANG_IMAGE="ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04" # builder has emqtt-bench installed BENCH_IMAGE="$ERLANG_IMAGE" From e1e5dc231d3e296ddb75d828e6a97fc65aa9ec59 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Wed, 5 Jun 2024 16:30:07 +0200 Subject: [PATCH 22/24] build: use generic env variables for otp and elixir when building docker image --- .github/workflows/build_and_push_docker_images.yaml | 8 ++++---- build | 10 +++++----- scripts/pr-sanity-checks.sh | 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/.github/workflows/build_and_push_docker_images.yaml b/.github/workflows/build_and_push_docker_images.yaml index f05e4e8fb..6c49236d4 100644 --- a/.github/workflows/build_and_push_docker_images.yaml +++ b/.github/workflows/build_and_push_docker_images.yaml @@ -169,8 +169,8 @@ jobs: EMQX_DOCKERFILE: 'deploy/docker/Dockerfile' PKG_VSN: ${{ needs.build.outputs.PKG_VSN }} EMQX_BUILDER_VERSION: ${{ inputs.builder_vsn }} - EMQX_BUILDER_OTP: ${{ inputs.otp_vsn }} - EMQX_BUILDER_ELIXIR: ${{ inputs.elixir_vsn }} + OTP_VSN: ${{ inputs.otp_vsn }} + ELIXIR_VSN: ${{ inputs.elixir_vsn }} EMQX_SOURCE_TYPE: tgz run: | ./build ${PROFILE} docker @@ -218,8 +218,8 @@ jobs: EMQX_DOCKERFILE: 'deploy/docker/Dockerfile' PKG_VSN: ${{ needs.build.outputs.PKG_VSN }} EMQX_BUILDER_VERSION: ${{ inputs.builder_vsn }} - EMQX_BUILDER_OTP: ${{ inputs.otp_vsn }} - EMQX_BUILDER_ELIXIR: ${{ inputs.elixir_vsn }} + OTP_VSN: ${{ inputs.otp_vsn }} + ELIXIR_VSN: ${{ inputs.elixir_vsn }} EMQX_SOURCE_TYPE: tgz run: | ./build ${PROFILE} docker diff --git a/build b/build index b429b7d62..bcea44a30 100755 --- a/build +++ b/build @@ -399,9 +399,9 @@ function is_ecr_and_enterprise() { make_docker() { local EMQX_BUILDER_VERSION="${EMQX_BUILDER_VERSION:-5.3-7}" local EMQX_BUILDER_PLATFORM="${EMQX_BUILDER_PLATFORM:-debian12}" - local EMQX_BUILDER_OTP="${EMQX_BUILDER_OTP:-26.2.5-1}" - local EMQX_BUILDER_ELIXIR="${EMQX_BUILDER_ELIXIR:-1.15.7}" - local EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${EMQX_BUILDER_ELIXIR}-${EMQX_BUILDER_OTP}-${EMQX_BUILDER_PLATFORM}} + local OTP_VSN="${OTP_VSN:-26.2.5-1}" + local ELIXIR_VSN="${ELIXIR_VSN:-1.15.7}" + local EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${ELIXIR_VSN}-${OTP_VSN}-${EMQX_BUILDER_PLATFORM}} local EMQX_RUNNER="${EMQX_RUNNER:-${EMQX_DEFAULT_RUNNER}}" local EMQX_DOCKERFILE="${EMQX_DOCKERFILE:-deploy/docker/Dockerfile}" local EMQX_SOURCE_TYPE="${EMQX_SOURCE_TYPE:-src}" @@ -465,7 +465,7 @@ make_docker() { --label org.opencontainers.image.description="${PRODUCT_DESCRIPTION}" \ --label org.opencontainers.image.documentation="${DOCUMENTATION_URL}" \ --label org.opencontainers.image.licenses="${LICENSE}" \ - --label org.opencontainers.image.otp.version="${EMQX_BUILDER_OTP}" \ + --label org.opencontainers.image.otp.version="${OTP_VSN}" \ --pull ) :> ./.emqx_docker_image_tags @@ -477,7 +477,7 @@ make_docker() { DOCKER_BUILDX_ARGS+=(--no-cache) fi if [ "${SUFFIX}" = '-elixir' ]; then - DOCKER_BUILDX_ARGS+=(--label org.opencontainers.image.elixir.version="${EMQX_BUILDER_ELIXIR}") + DOCKER_BUILDX_ARGS+=(--label org.opencontainers.image.elixir.version="${ELIXIR_VSN}") fi if [ "${DOCKER_LATEST:-false}" = true ]; then for r in "${DOCKER_REGISTRIES[@]}"; do diff --git a/scripts/pr-sanity-checks.sh b/scripts/pr-sanity-checks.sh index ed796d87a..ad8fbaaa4 100755 --- a/scripts/pr-sanity-checks.sh +++ b/scripts/pr-sanity-checks.sh @@ -13,10 +13,10 @@ if ! type "yq" > /dev/null; then fi EMQX_BUILDER_VERSION=${EMQX_BUILDER_VERSION:-5.3-7} -EMQX_BUILDER_OTP=${EMQX_BUILDER_OTP:-26.2.5-1} -EMQX_BUILDER_ELIXIR=${EMQX_BUILDER_ELIXIR:-1.15.7} +OTP_VSN=${OTP_VSN:-26.2.5-1} +ELIXIR_VSN=${ELIXIR_VSN:-1.15.7} EMQX_BUILDER_PLATFORM=${EMQX_BUILDER_PLATFORM:-ubuntu22.04} -EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${EMQX_BUILDER_ELIXIR}-${EMQX_BUILDER_OTP}-${EMQX_BUILDER_PLATFORM}} +EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${ELIXIR_VSN}-${OTP_VSN}-${EMQX_BUILDER_PLATFORM}} commands=$(yq ".jobs.sanity-checks.steps[].run" .github/workflows/_pr_entrypoint.yaml | grep -v null) From c7f585cc218d4583d68cac9a930b2982af1e2c31 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 5 Jun 2024 21:54:17 +0200 Subject: [PATCH 23/24] fix(bin/emqx): ensure -noinput is prior to -noshell iex seems to depend on this order, otherwise it terminiates right after boot --- bin/emqx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/emqx b/bin/emqx index 1db57d7e8..3bd86db0c 100755 --- a/bin/emqx +++ b/bin/emqx @@ -1199,7 +1199,7 @@ case "${COMMAND}" in esac case "$COMMAND" in foreground) - FOREGROUNDOPTIONS="-enable-feature maybe_expr -noshell -noinput +Bd" + FOREGROUNDOPTIONS="-enable-feature maybe_expr -noinput -noshell +Bd" ;; *) FOREGROUNDOPTIONS='-enable-feature maybe_expr' From c41be7ee22b310e46786116e20088b84bb528a3e Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 5 Jun 2024 23:37:21 +0200 Subject: [PATCH 24/24] chore: fix dialyzer warning --- .../emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src | 2 +- .../src/emqx_bridge_sqlserver_connector.erl | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index 0f977e81e..4b3135f5f 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_sqlserver, [ {description, "EMQX Enterprise SQL Server Bridge"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, odbc]}, {env, [ diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 726d2656a..603ef18d0 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -78,7 +78,7 @@ %% https://www.erlang.org/doc/man/odbc.html %% as returned by connect/2 --type connection_reference() :: pid(). +-type connection_reference() :: odbc:connection_reference(). -type time_out() :: milliseconds() | infinity. -type sql() :: string() | binary(). -type milliseconds() :: pos_integer(). @@ -478,7 +478,7 @@ worker_do_insert( {error, {unrecoverable_error, {invalid_request, Reason}}} end. --spec execute(pid(), sql()) -> +-spec execute(connection_reference(), sql()) -> updated_tuple() | selected_tuple() | [updated_tuple()] @@ -487,7 +487,7 @@ worker_do_insert( execute(Conn, SQL) -> odbc:sql_query(Conn, str(SQL)). --spec execute(pid(), sql(), time_out()) -> +-spec execute(connection_reference(), sql(), time_out()) -> updated_tuple() | selected_tuple() | [updated_tuple()]