diff --git a/.github/workflows/build_packages_cron.yaml b/.github/workflows/build_packages_cron.yaml index 26ba3b1ec..69a6e4b00 100644 --- a/.github/workflows/build_packages_cron.yaml +++ b/.github/workflows/build_packages_cron.yaml @@ -23,6 +23,7 @@ jobs: profile: - ['emqx', 'master'] - ['emqx', 'release-57'] + - ['emqx', 'release-58'] os: - ubuntu22.04 - amzn2023 diff --git a/.github/workflows/codeql.yaml b/.github/workflows/codeql.yaml index f086b97f6..f6077262f 100644 --- a/.github/workflows/codeql.yaml +++ b/.github/workflows/codeql.yaml @@ -24,6 +24,7 @@ jobs: branch: - master - release-57 + - release-58 language: - cpp - python diff --git a/.github/workflows/green_master.yaml b/.github/workflows/green_master.yaml index 8479ea5ed..a5317027b 100644 --- a/.github/workflows/green_master.yaml +++ b/.github/workflows/green_master.yaml @@ -24,6 +24,7 @@ jobs: ref: - master - release-57 + - release-58 steps: - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 with: diff --git a/Makefile b/Makefile index b803e639b..69667c952 100644 --- a/Makefile +++ b/Makefile @@ -10,8 +10,8 @@ include env.sh # Dashboard version # from https://github.com/emqx/emqx-dashboard5 -export EMQX_DASHBOARD_VERSION ?= v1.9.2 -export EMQX_EE_DASHBOARD_VERSION ?= e1.7.2 +export EMQX_DASHBOARD_VERSION ?= v1.10.0-beta.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.8.0-beta.1 export EMQX_RELUP ?= true export EMQX_REL_FORM ?= tgz diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 4357f7aef..7979d00fd 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,7 +32,7 @@ %% `apps/emqx/src/bpapi/README.md' %% Opensource edition --define(EMQX_RELEASE_CE, "5.7.2"). +-define(EMQX_RELEASE_CE, "5.8.0-alpha.1"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.7.2"). +-define(EMQX_RELEASE_EE, "5.8.0-alpha.1"). diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index b98728ed1..6e629e9fa 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -28,7 +28,7 @@ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.2"}}}, diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 20b1445c9..fca68ba8c 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -2,7 +2,7 @@ {application, emqx, [ {id, "emqx"}, {description, "EMQX Core"}, - {vsn, "5.3.3"}, + {vsn, "5.3.4"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx_auth/src/emqx_auth.app.src b/apps/emqx_auth/src/emqx_auth.app.src index d61ba281b..d2212ffe2 100644 --- a/apps/emqx_auth/src/emqx_auth.app.src +++ b/apps/emqx_auth/src/emqx_auth.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth, [ {description, "EMQX Authentication and authorization"}, - {vsn, "0.3.3"}, + {vsn, "0.3.4"}, {modules, []}, {registered, [emqx_auth_sup]}, {applications, [ diff --git a/apps/emqx_auth/src/emqx_authz/emqx_authz.erl b/apps/emqx_auth/src/emqx_authz/emqx_authz.erl index 8bc60a600..e76d52535 100644 --- a/apps/emqx_auth/src/emqx_authz/emqx_authz.erl +++ b/apps/emqx_auth/src/emqx_authz/emqx_authz.erl @@ -477,9 +477,15 @@ authorize_deny( sources() ) -> authz_result(). -authorize(Client, PubSub, Topic, _DefaultResult, Sources) -> +authorize(#{username := Username} = Client, PubSub, Topic, _DefaultResult, Sources) -> case maps:get(is_superuser, Client, false) of true -> + ?tp(authz_skipped, #{reason => client_is_superuser, action => PubSub}), + ?TRACE("AUTHZ", "authorization_skipped_as_superuser", #{ + username => Username, + topic => Topic, + action => emqx_access_control:format_action(PubSub) + }), emqx_metrics:inc(?METRIC_SUPERUSER), {stop, #{result => allow, from => superuser}}; false -> diff --git a/apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl b/apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl index 575eb4109..4745d7ec6 100644 --- a/apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl +++ b/apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl @@ -674,5 +674,77 @@ t_publish_last_will_testament_banned_client_connecting(_Config) -> ok. +t_sikpped_as_superuser(_Config) -> + ClientInfo = #{ + clientid => <<"clientid">>, + username => <<"username">>, + peerhost => {127, 0, 0, 1}, + zone => default, + listener => {tcp, default}, + is_superuser => true + }, + ?check_trace( + begin + ?assertEqual( + allow, + emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_0), <<"p/t/0">>) + ), + ?assertEqual( + allow, + emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_1), <<"p/t/1">>) + ), + ?assertEqual( + allow, + emqx_access_control:authorize(ClientInfo, ?AUTHZ_PUBLISH(?QOS_2), <<"p/t/2">>) + ), + ?assertEqual( + allow, + emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_0), <<"s/t/0">>) + ), + ?assertEqual( + allow, + emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_1), <<"s/t/1">>) + ), + ?assertEqual( + allow, + emqx_access_control:authorize(ClientInfo, ?AUTHZ_SUBSCRIBE(?QOS_2), <<"s/t/2">>) + ) + end, + fun(Trace) -> + ?assertMatch( + [ + #{ + reason := client_is_superuser, + action := #{qos := ?QOS_0, action_type := publish} + }, + #{ + reason := client_is_superuser, + action := #{qos := ?QOS_1, action_type := publish} + }, + #{ + reason := client_is_superuser, + action := #{qos := ?QOS_2, action_type := publish} + }, + #{ + reason := client_is_superuser, + action := #{qos := ?QOS_0, action_type := subscribe} + }, + #{ + reason := client_is_superuser, + action := #{qos := ?QOS_1, action_type := subscribe} + }, + #{ + reason := client_is_superuser, + action := #{qos := ?QOS_2, action_type := subscribe} + } + ], + ?of_kind(authz_skipped, Trace) + ), + ok + end + ), + + ok = snabbkaffe:stop(). + stop_apps(Apps) -> lists:foreach(fun application:stop/1, Apps). diff --git a/apps/emqx_auth_http/src/emqx_auth_http.app.src b/apps/emqx_auth_http/src/emqx_auth_http.app.src index 9cf62ae15..b2885711d 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.app.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_http, [ {description, "EMQX External HTTP API Authentication and Authorization"}, - {vsn, "0.3.0"}, + {vsn, "0.3.1"}, {registered, []}, {mod, {emqx_auth_http_app, []}}, {applications, [ diff --git a/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src b/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src index 1edb5fc67..885a0002b 100644 --- a/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src +++ b/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_jwt, [ {description, "EMQX JWT Authentication and Authorization"}, - {vsn, "0.3.2"}, + {vsn, "0.3.3"}, {registered, []}, {mod, {emqx_auth_jwt_app, []}}, {applications, [ diff --git a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src index 7fcdda1d3..3e862e474 100644 --- a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src +++ b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_mnesia, [ {description, "EMQX Buitl-in Database Authentication and Authorization"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {mod, {emqx_auth_mnesia_app, []}}, {applications, [ diff --git a/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src b/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src index 5ffc59787..837f20230 100644 --- a/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src +++ b/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_mongodb, [ {description, "EMQX MongoDB Authentication and Authorization"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {mod, {emqx_auth_mongodb_app, []}}, {applications, [ diff --git a/apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src b/apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src index abd9a7e27..07329c5b0 100644 --- a/apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src +++ b/apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_mysql, [ {description, "EMQX MySQL Authentication and Authorization"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {mod, {emqx_auth_mysql_app, []}}, {applications, [ diff --git a/apps/emqx_auth_postgresql/src/emqx_auth_postgresql.app.src b/apps/emqx_auth_postgresql/src/emqx_auth_postgresql.app.src index 1eabc93f0..1fddca42f 100644 --- a/apps/emqx_auth_postgresql/src/emqx_auth_postgresql.app.src +++ b/apps/emqx_auth_postgresql/src/emqx_auth_postgresql.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_postgresql, [ {description, "EMQX PostgreSQL Authentication and Authorization"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {mod, {emqx_auth_postgresql_app, []}}, {applications, [ diff --git a/apps/emqx_auth_redis/src/emqx_auth_redis.app.src b/apps/emqx_auth_redis/src/emqx_auth_redis.app.src index 9b43eca2c..7d82242a7 100644 --- a/apps/emqx_auth_redis/src/emqx_auth_redis.app.src +++ b/apps/emqx_auth_redis/src/emqx_auth_redis.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_redis, [ {description, "EMQX Redis Authentication and Authorization"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {mod, {emqx_auth_redis_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 30930c494..ea0339943 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 6b160f3b3..fdbbc5376 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1154,7 +1154,7 @@ t_bridges_probe(Config) -> ?assertMatch( {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, - <<"message">> := <<"Connection refused">> + <<"message">> := <<"Connection refused", _/binary>> }}, request_json( post, diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index d98f4f926..46d1883bd 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -889,7 +889,8 @@ t_sync_query_down(Config, Opts) -> ), ?force_ordering( - #{?snk_kind := call_query}, + #{?snk_kind := SNKKind} when + SNKKind =:= call_query orelse SNKKind =:= simple_query_enter, #{?snk_kind := cut_connection, ?snk_span := start} ), %% Note: order of arguments here is reversed compared to `?force_ordering'. @@ -913,6 +914,7 @@ t_sync_query_down(Config, Opts) -> emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort) ) end), + ?tp("publishing_message", #{}), try {_, {ok, _}} = snabbkaffe:wait_async_action( @@ -921,6 +923,7 @@ t_sync_query_down(Config, Opts) -> infinity ) after + ?tp("healing_failure", #{}), emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort) end, {ok, _} = snabbkaffe:block_until(SuccessTPFilter, infinity), diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index a39c4be99..6d786a2bd 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.3.2"}, + {vsn, "0.3.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src index 8b8d379e4..e215014a9 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src +++ b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_http, [ {description, "EMQX HTTP Bridge and Connector Application"}, - {vsn, "0.3.3"}, + {vsn, "0.3.4"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, ehttpc]}, {env, [ diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 0e906203d..5a8973666 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.3.3"}, + {vsn, "0.3.4"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 74d3a5f54..d7408f0e5 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -1918,13 +1918,14 @@ t_node_joins_existing_cluster(Config) -> _Attempts2 = 50, [] =/= erpc:call(N2, emqx_router, lookup_routes, [MQTTTopic]) ), + NumMsgs = 50 * NPartitions, {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{ ?snk_kind := kafka_consumer_handle_message, ?snk_span := {complete, _} }), - NPartitions, + NumMsgs, 20_000 ), lists:foreach( @@ -1933,7 +1934,7 @@ t_node_joins_existing_cluster(Config) -> Val = <<"v", (integer_to_binary(N))/binary>>, publish(Config, KafkaTopic, [#{key => Key, value => Val}]) end, - lists:seq(1, 10 * NPartitions) + lists:seq(1, NumMsgs) ), {ok, _} = snabbkaffe:receive_events(SRef1), diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index d43ec5591..0d3796398 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_mqtt, [ {description, "EMQX MQTT Broker Bridge"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 118542356..6b9a40123 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -98,7 +98,7 @@ on_start(ResourceId, #{server := Server} = Conf) -> server => Server }}; {error, Reason} -> - {error, Reason} + {error, emqx_maybe:define(explain_error(Reason), Reason)} end. on_add_channel( @@ -200,7 +200,7 @@ on_get_channel_status( } = _State ) when is_map_key(ChannelId, Channels) -> %% The channel should be ok as long as the MQTT client is ok - connected. + ?status_connected. on_get_channels(ResId) -> emqx_bridge_v2:get_channels_for_connector(ResId). @@ -356,10 +356,15 @@ on_get_status(_ResourceId, State) -> Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)], try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of Statuses -> - combine_status(Statuses) + case combine_status(Statuses) of + {Status, Msg} -> + {Status, State, Msg}; + Status -> + Status + end catch exit:timeout -> - connecting + ?status_connecting end. get_status({_Pool, Worker}) -> @@ -367,7 +372,7 @@ get_status({_Pool, Worker}) -> {ok, Client} -> emqx_bridge_mqtt_ingress:status(Client); {error, _} -> - disconnected + ?status_disconnected end. combine_status(Statuses) -> @@ -375,11 +380,25 @@ combine_status(Statuses) -> %% Natural order of statuses: [connected, connecting, disconnected] %% * `disconnected` wins over any other status %% * `connecting` wins over `connected` - case lists:reverse(lists:usort(Statuses)) of + ToStatus = fun + ({S, _Reason}) -> S; + (S) when is_atom(S) -> S + end, + CompareFn = fun(S1A, S2A) -> + S1 = ToStatus(S1A), + S2 = ToStatus(S2A), + S1 > S2 + end, + case lists:usort(CompareFn, Statuses) of + [{Status, Reason} | _] -> + case explain_error(Reason) of + undefined -> Status; + Msg -> {Status, Msg} + end; [Status | _] -> Status; [] -> - disconnected + ?status_disconnected end. mk_ingress_config( @@ -514,15 +533,54 @@ connect(Pid, Name) -> {ok, Pid}; {error, Reason} = Error -> IsDryRun = emqx_resource:is_dry_run(Name), - ?SLOG(?LOG_LEVEL(IsDryRun), #{ - msg => "ingress_client_connect_failed", - reason => Reason, - resource_id => Name - }), + log_connect_error_reason(?LOG_LEVEL(IsDryRun), Reason, Name), _ = catch emqtt:stop(Pid), Error end. +log_connect_error_reason(Level, {tcp_closed, _} = Reason, Name) -> + ?tp(emqx_bridge_mqtt_connector_tcp_closed, #{}), + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name, + explain => explain_error(Reason) + }); +log_connect_error_reason(Level, econnrefused = Reason, Name) -> + ?tp(emqx_bridge_mqtt_connector_econnrefused_error, #{}), + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name, + explain => explain_error(Reason) + }); +log_connect_error_reason(Level, Reason, Name) -> + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name + }). + +explain_error(econnrefused) -> + << + "Connection refused. " + "This error indicates that your connection attempt to the MQTT server was rejected. " + "In simpler terms, the server you tried to connect to refused your request. " + "There can be multiple reasons for this. " + "For example, the MQTT server you're trying to connect to might be down or not " + "running at all or you might have provided the wrong address " + "or port number for the server." + >>; +explain_error({tcp_closed, _}) -> + << + "Your MQTT connection attempt was unsuccessful. " + "It might be at its maximum capacity for handling new connections. " + "To diagnose the issue further, you can check the server logs for " + "any specific messages related to the unavailability or connection limits." + >>; +explain_error(_Reason) -> + undefined. + handle_disconnect(_Reason) -> ok. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl index 1749d4194..35aea67a6 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -19,6 +19,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). %% management APIs -export([ @@ -234,13 +235,13 @@ status(Pid) -> try case proplists:get_value(socket, info(Pid)) of Socket when Socket /= undefined -> - connected; + ?status_connected; undefined -> - connecting + ?status_connecting end catch exit:{noproc, _} -> - disconnected + ?status_disconnected end. %% diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index a220eb9f7..42cf9d2b8 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -1025,31 +1025,39 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> ct:sleep(1000), %% stop the listener 1883 to make the bridge disconnected - ok = emqx_listeners:stop_listener('tcp:default'), - ct:sleep(1500), - ?assertMatch( - #{<<"status">> := Status} when - Status == <<"connecting">> orelse Status == <<"disconnected">>, - request_bridge(BridgeIDEgress) + ?check_trace( + begin + ok = emqx_listeners:stop_listener('tcp:default'), + ct:sleep(1500), + ?assertMatch( + #{<<"status">> := Status} when + Status == <<"connecting">> orelse Status == <<"disconnected">>, + request_bridge(BridgeIDEgress) + ), + + %% start the listener 1883 to make the bridge reconnected + ok = emqx_listeners:start_listener('tcp:default'), + timer:sleep(1500), + ?assertMatch( + #{<<"status">> := <<"connected">>}, + request_bridge(BridgeIDEgress) + ), + + N = stop_publisher(Publisher), + + %% all those messages should eventually be delivered + [ + assert_mqtt_msg_received(RemoteTopic, Payload) + || I <- lists:seq(1, N), + Payload <- [integer_to_binary(I)] + ], + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_econnrefused_error, Trace)), + ok + end ), - - %% start the listener 1883 to make the bridge reconnected - ok = emqx_listeners:start_listener('tcp:default'), - timer:sleep(1500), - ?assertMatch( - #{<<"status">> := <<"connected">>}, - request_bridge(BridgeIDEgress) - ), - - N = stop_publisher(Publisher), - - %% all those messages should eventually be delivered - [ - assert_mqtt_msg_received(RemoteTopic, Payload) - || I <- lists:seq(1, N), - Payload <- [integer_to_binary(I)] - ], - ok. start_publisher(Topic, Interval, CtrlPid) -> diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl index b9097b9c3..e0598fa1e 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl @@ -131,6 +131,9 @@ hookpoint(Config) -> BridgeId = bridge_id(Config), emqx_bridge_resource:bridge_hookpoint(BridgeId). +simplify_result(Res) -> + emqx_bridge_v2_testlib:simplify_result(Res). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -246,3 +249,46 @@ t_receive_via_rule(Config) -> end ), ok. + +t_connect_with_more_clients_than_the_broker_accepts(Config) -> + Name = ?config(connector_name, Config), + OrgConf = emqx_mgmt_listeners_conf:get_raw(tcp, default), + on_exit(fun() -> + emqx_mgmt_listeners_conf:update(tcp, default, OrgConf) + end), + NewConf = OrgConf#{<<"max_connections">> => 3}, + {ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf), + ?check_trace( + #{timetrap => 10_000}, + begin + ?assertMatch( + {201, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := + <<"Your MQTT connection attempt was unsuccessful", _/binary>> + }}, + simplify_result( + emqx_bridge_v2_testlib:create_connector_api( + Config, + #{<<"pool_size">> => 100} + ) + ) + ), + ?block_until(#{?snk_kind := emqx_bridge_mqtt_connector_tcp_closed}), + ?assertMatch( + {200, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := + <<"Your MQTT connection attempt was unsuccessful", _/binary>> + }}, + simplify_result(emqx_bridge_v2_testlib:get_connector_api(mqtt, Name)) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_tcp_closed, Trace)), + ok + end + ), + + ok. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index dcb86a3ca..93603db21 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl index 6d15687f6..fb9a38cc6 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl @@ -11,7 +11,8 @@ action_type_name/0, connector_type_name/0, schema_module/0, - is_action/1 + is_action/1, + connector_action_config_to_bridge_v1_config/2 ]). is_action(_) -> true. @@ -23,3 +24,28 @@ action_type_name() -> pulsar. connector_type_name() -> pulsar. schema_module() -> emqx_bridge_pulsar_pubsub_schema. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + BridgeV1Config1 = emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorConfig, ActionConfig + ), + BridgeV1Config = maps:with(v1_fields(pulsar_producer), BridgeV1Config1), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun(RO) -> maps:with(v1_fields(producer_resource_opts), RO) end, + BridgeV1Config + ). + +%%------------------------------------------------------------------------------------------ +%% Internal helper functions +%%------------------------------------------------------------------------------------------ + +v1_fields(Struct) -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_pulsar:fields(Struct) + ]. + +to_bin(B) when is_binary(B) -> B; +to_bin(L) when is_list(L) -> list_to_binary(L); +to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 64dde77fb..470f7f832 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -60,6 +60,8 @@ resource_type() -> pulsar. callback_mode() -> async_if_possible. +query_mode(#{resource_opts := #{query_mode := sync}}) -> + simple_sync_internal_buffer; query_mode(_Config) -> simple_async_internal_buffer. @@ -204,12 +206,17 @@ on_query(_InstanceId, {ChannelId, Message}, State) -> sync_timeout => SyncTimeout, is_async => false }), - try - pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) - catch - error:timeout -> - {error, timeout} - end + ?tp_span( + "pulsar_producer_query_enter", + #{instance_id => _InstanceId, message => Message, mode => sync}, + try + ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => sync}), + pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) + catch + error:timeout -> + {error, timeout} + end + ) end. -spec on_query_async( @@ -220,11 +227,11 @@ on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) -> #{channels := Channels} = State, case maps:find(ChannelId, Channels) of error -> - {error, channel_not_found}; + {error, {unrecoverable_error, channel_not_found}}; {ok, #{message := MessageTmpl, producers := Producers}} -> ?tp_span( - pulsar_producer_on_query_async, - #{instance_id => _InstanceId, message => Message}, + "pulsar_producer_query_enter", + #{instance_id => _InstanceId, message => Message, mode => async}, on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ) end. @@ -235,6 +242,7 @@ on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) -> message => PulsarMessage, is_async => true }), + ?tp("pulsar_producer_send", #{msg => PulsarMessage, mode => async}), pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}). on_format_query_result({ok, Info}) -> diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl index dff62843e..515fcdb5a 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl @@ -66,10 +66,8 @@ fields(action_resource_opts) -> batch_size, batch_time, worker_pool_size, - request_ttl, inflight_window, - max_buffer_bytes, - query_mode + max_buffer_bytes ], lists:filter( fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl index cd54e2194..0a908f5be 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl @@ -843,7 +843,8 @@ do_t_send_with_failure(Config, FailureType) -> ?wait_async_action( emqx:publish(Message0), #{ - ?snk_kind := pulsar_producer_on_query_async, + ?snk_kind := "pulsar_producer_query_enter", + mode := async, ?snk_span := {complete, _} }, 5_000 @@ -970,7 +971,11 @@ t_producer_process_crash(Config) -> {_, {ok, _}} = ?wait_async_action( emqx:publish(Message0), - #{?snk_kind := pulsar_producer_on_query_async, ?snk_span := {complete, _}}, + #{ + ?snk_kind := "pulsar_producer_query_enter", + mode := async, + ?snk_span := {complete, _} + }, 5_000 ), Data0 = receive_consumed(20_000), diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl index 11caa15c6..94534fafd 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl @@ -23,31 +23,25 @@ %%------------------------------------------------------------------------------ all() -> - [ - {group, plain}, - {group, tls} - ]. + All0 = emqx_common_test_helpers:all(?MODULE), + All = All0 -- matrix_cases(), + Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()), + Groups ++ All. groups() -> - AllTCs = emqx_common_test_helpers:all(?MODULE), - [ - {plain, AllTCs}, - {tls, AllTCs} - ]. + emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()). + +matrix_cases() -> + emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - %% Ensure enterprise bridge module is loaded - _ = emqx_bridge_enterprise:module_info(), - {ok, Cwd} = file:get_cwd(), - PrivDir = ?config(priv_dir, Config), - WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd), Apps = emqx_cth_suite:start( lists:flatten([ ?APPS, emqx_management, emqx_mgmt_api_test_util:emqx_dashboard() ]), - #{work_dir => WorkDir} + #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{suite_apps, Apps} | Config]. @@ -61,6 +55,7 @@ init_per_group(plain = Type, Config) -> case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of true -> Config1 = common_init_per_group(), + ConnectorName = ?MODULE, NewConfig = [ {proxy_name, ProxyName}, @@ -70,7 +65,7 @@ init_per_group(plain = Type, Config) -> {use_tls, false} | Config1 ++ Config ], - create_connector(?MODULE, NewConfig), + create_connector(ConnectorName, NewConfig), NewConfig; false -> maybe_skip_without_ci() @@ -82,6 +77,7 @@ init_per_group(tls = Type, Config) -> case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of true -> Config1 = common_init_per_group(), + ConnectorName = ?MODULE, NewConfig = [ {proxy_name, ProxyName}, @@ -91,17 +87,21 @@ init_per_group(tls = Type, Config) -> {use_tls, true} | Config1 ++ Config ], - create_connector(?MODULE, NewConfig), + create_connector(ConnectorName, NewConfig), NewConfig; false -> maybe_skip_without_ci() - end. + end; +init_per_group(_Group, Config) -> + Config. end_per_group(Group, Config) when Group =:= plain; Group =:= tls -> common_end_per_group(Config), + ok; +end_per_group(_Group, _Config) -> ok. common_init_per_group() -> @@ -189,66 +189,49 @@ pulsar_connector(Config) -> ":", integer_to_binary(PulsarPort) ]), - Connector = #{ - <<"connectors">> => #{ - <<"pulsar">> => #{ - Name => #{ - <<"enable">> => true, - <<"ssl">> => #{ - <<"enable">> => UseTLS, - <<"verify">> => <<"verify_none">>, - <<"server_name_indication">> => <<"auto">> - }, - <<"authentication">> => <<"none">>, - <<"servers">> => ServerURL - } - } - } + InnerConfigMap = #{ + <<"enable">> => true, + <<"ssl">> => #{ + <<"enable">> => UseTLS, + <<"verify">> => <<"verify_none">>, + <<"server_name_indication">> => <<"auto">> + }, + <<"authentication">> => <<"none">>, + <<"servers">> => ServerURL }, - parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name). + emqx_bridge_v2_testlib:parse_and_check_connector(?TYPE, Name, InnerConfigMap). pulsar_action(Config) -> + QueryMode = proplists:get_value(query_mode, Config, <<"sync">>), Name = atom_to_binary(?MODULE), - Action = #{ - <<"actions">> => #{ - <<"pulsar">> => #{ - Name => #{ - <<"connector">> => Name, - <<"enable">> => true, - <<"parameters">> => #{ - <<"retention_period">> => <<"infinity">>, - <<"max_batch_bytes">> => <<"1MB">>, - <<"batch_size">> => 100, - <<"strategy">> => <<"random">>, - <<"buffer">> => #{ - <<"mode">> => <<"memory">>, - <<"per_partition_limit">> => <<"10MB">>, - <<"segment_bytes">> => <<"5MB">>, - <<"memory_overload_protection">> => true - }, - <<"message">> => #{ - <<"key">> => <<"${.clientid}">>, - <<"value">> => <<"${.}">> - }, - <<"pulsar_topic">> => ?config(pulsar_topic, Config) - }, - <<"resource_opts">> => #{ - <<"health_check_interval">> => <<"1s">>, - <<"metrics_flush_interval">> => <<"300ms">> - } - } - } + InnerConfigMap = #{ + <<"connector">> => Name, + <<"enable">> => true, + <<"parameters">> => #{ + <<"retention_period">> => <<"infinity">>, + <<"max_batch_bytes">> => <<"1MB">>, + <<"batch_size">> => 100, + <<"strategy">> => <<"random">>, + <<"buffer">> => #{ + <<"mode">> => <<"memory">>, + <<"per_partition_limit">> => <<"10MB">>, + <<"segment_bytes">> => <<"5MB">>, + <<"memory_overload_protection">> => true + }, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"value">> => <<"${.}">> + }, + <<"pulsar_topic">> => ?config(pulsar_topic, Config) + }, + <<"resource_opts">> => #{ + <<"query_mode">> => QueryMode, + <<"request_ttl">> => <<"1s">>, + <<"health_check_interval">> => <<"1s">>, + <<"metrics_flush_interval">> => <<"300ms">> } }, - parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name). - -parse_and_check(Key, Mod, Conf, Name) -> - ConfStr = hocon_pp:do(Conf, #{}), - ct:pal(ConfStr), - {ok, RawConf} = hocon:binary(ConfStr, #{format => map}), - hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}), - #{Key := #{<<"pulsar">> := #{Name := RetConf}}} = RawConf, - RetConf. + emqx_bridge_v2_testlib:parse_and_check(action, ?TYPE, Name, InnerConfigMap). instance_id(Type, Name) -> ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name), @@ -404,20 +387,44 @@ assert_status_api(Line, Type, Name, Status) -> ). -define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)). +proplists_with(Keys, PList) -> + lists:filter(fun({K, _}) -> lists:member(K, Keys) end, PList). + +group_path(Config) -> + case emqx_common_test_helpers:group_path(Config) of + [] -> + undefined; + Path -> + Path + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ -t_action_probe(Config) -> +t_action_probe(matrix) -> + [[plain], [tls]]; +t_action_probe(Config) when is_list(Config) -> Name = atom_to_binary(?FUNCTION_NAME), Action = pulsar_action(Config), {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action), ?assertMatch({{_, 204, _}, _, _}, Res0), ok. -t_action(Config) -> +t_action(matrix) -> + [ + [plain, async], + [plain, sync], + [tls, async] + ]; +t_action(Config) when is_list(Config) -> + QueryMode = + case group_path(Config) of + [_, QM | _] -> atom_to_binary(QM); + _ -> <<"async">> + end, Name = atom_to_binary(?FUNCTION_NAME), - create_action(Name, Config), + create_action(Name, [{query_mode, QueryMode} | Config]), Actions = emqx_bridge_v2:list(actions), Any = fun(#{name := BName}) -> BName =:= Name end, ?assert(lists:any(Any, Actions), Actions), @@ -465,7 +472,9 @@ t_action(Config) -> %% Tests that deleting/disabling an action that share the same Pulsar topic with other %% actions do not disturb the latter. -t_multiple_actions_sharing_topic(Config) -> +t_multiple_actions_sharing_topic(matrix) -> + [[plain], [tls]]; +t_multiple_actions_sharing_topic(Config) when is_list(Config) -> Type = ?TYPE, ConnectorName = <<"c">>, ConnectorConfig = pulsar_connector(Config), @@ -546,3 +555,31 @@ t_multiple_actions_sharing_topic(Config) -> [] ), ok. + +t_sync_query_down(matrix) -> + [[plain]]; +t_sync_query_down(Config0) when is_list(Config0) -> + ct:timetrap({seconds, 15}), + Payload = #{<<"x">> => <<"some data">>}, + PayloadBin = emqx_utils_json:encode(Payload), + ClientId = <<"some_client">>, + Opts = #{ + make_message_fn => fun(Topic) -> emqx_message:make(ClientId, Topic, PayloadBin) end, + enter_tp_filter => + ?match_event(#{?snk_kind := "pulsar_producer_send"}), + error_tp_filter => + ?match_event(#{?snk_kind := "resource_simple_sync_internal_buffer_query_timeout"}), + success_tp_filter => + ?match_event(#{?snk_kind := pulsar_echo_consumer_message}) + }, + Config = [ + {connector_type, ?TYPE}, + {connector_name, ?FUNCTION_NAME}, + {connector_config, pulsar_connector(Config0)}, + {action_type, ?TYPE}, + {action_name, ?FUNCTION_NAME}, + {action_config, pulsar_action(Config0)} + | proplists_with([proxy_name, proxy_host, proxy_port], Config0) + ], + emqx_bridge_v2_testlib:t_sync_query_down(Config, Opts), + ok. diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src index c178b1f5e..1ff42961b 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rabbitmq, [ {description, "EMQX Enterprise RabbitMQ Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {mod, {emqx_bridge_rabbitmq_app, []}}, {applications, [ diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src index da9cd1a96..49d6a5985 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_s3, [ {description, "EMQX Enterprise S3 Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index 009a8d16b..4060c926e 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_sqlserver, [ {description, "EMQX Enterprise SQL Server Bridge"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, odbc]}, {env, [ diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src index cd1d51b01..59276e32e 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_syskeeper, [ {description, "EMQX Enterprise Data bridge for Syskeeper"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index dc406b735..80b504699 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib]}, diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index ea9e582ae..473555e6c 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.3.3"}, + {vsn, "0.3.4"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_dashboard/src/emqx_dashboard.app.src b/apps/emqx_dashboard/src/emqx_dashboard.app.src index 2835feb34..3a595629e 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.app.src +++ b/apps/emqx_dashboard/src/emqx_dashboard.app.src @@ -2,7 +2,7 @@ {application, emqx_dashboard, [ {description, "EMQX Web Dashboard"}, % strict semver, bump manually! - {vsn, "5.1.3"}, + {vsn, "5.1.4"}, {modules, []}, {registered, [emqx_dashboard_sup]}, {applications, [ diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src index 95d49a150..f3d11d445 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src @@ -1,6 +1,6 @@ {application, emqx_dashboard_sso, [ {description, "EMQX Dashboard Single Sign-On"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, [emqx_dashboard_sso_sup]}, {applications, [ kernel, diff --git a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src index e9c1f2b4a..54bd68562 100644 --- a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src +++ b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_coap, [ {description, "CoAP Gateway"}, - {vsn, "0.1.9"}, + {vsn, "0.1.10"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src index 1d5cb85b8..230692c94 100644 --- a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src +++ b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_exproto, [ {description, "ExProto Gateway"}, - {vsn, "0.1.12"}, + {vsn, "0.1.13"}, {registered, []}, {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src index f96d112e9..2e5eece56 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src +++ b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_gbt32960, [ {description, "GBT32960 Gateway"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src index 8d1e33f74..94a6dc767 100644 --- a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src +++ b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_jt808, [ {description, "JT/T 808 Gateway"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src index 1dc3f6939..f3a39c5a1 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src +++ b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_mqttsn, [ {description, "MQTT-SN Gateway"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src index eb471458f..09d3f1847 100644 --- a/apps/emqx_machine/src/emqx_machine.app.src +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -3,7 +3,7 @@ {id, "emqx_machine"}, {description, "The EMQX Machine"}, % strict semver, bump manually! - {vsn, "0.3.3"}, + {vsn, "0.3.4"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, emqx_ctl, redbug]}, diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index c22793cf0..aca888aa6 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -2,7 +2,7 @@ {application, emqx_management, [ {description, "EMQX Management API and CLI"}, % strict semver, bump manually! - {vsn, "5.2.3"}, + {vsn, "5.2.4"}, {modules, []}, {registered, [emqx_management_sup]}, {applications, [ diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 2742167f5..035af188f 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -23,6 +23,7 @@ -include_lib("emqx/include/logger.hrl"). -define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}). +-define(EXCLUSIVE_TAB, emqx_exclusive_subscription). -export([load/0]). @@ -45,7 +46,8 @@ olp/1, data/1, ds/1, - cluster_info/0 + cluster_info/0, + exclusive/1 ]). -spec load() -> ok. @@ -1024,7 +1026,9 @@ print({?SUBOPTION, {{Topic, Pid}, Options}}) when is_pid(Pid) -> NL = maps:get(nl, Options, 0), RH = maps:get(rh, Options, 0), RAP = maps:get(rap, Options, 0), - emqx_ctl:print("~ts -> topic:~ts qos:~p nl:~p rh:~p rap:~p~n", [SubId, Topic, QoS, NL, RH, RAP]). + emqx_ctl:print("~ts -> topic:~ts qos:~p nl:~p rh:~p rap:~p~n", [SubId, Topic, QoS, NL, RH, RAP]); +print({exclusive, {exclusive_subscription, Topic, ClientId}}) -> + emqx_ctl:print("topic:~ts -> ClientId:~ts~n", [Topic, ClientId]). format(_, undefined) -> undefined; @@ -1085,3 +1089,19 @@ safe_call_mria(Fun, Args, OnFail) -> }), OnFail end. +%%-------------------------------------------------------------------- +%% @doc Exclusive topics +exclusive(["list"]) -> + case ets:info(?EXCLUSIVE_TAB, size) of + 0 -> emqx_ctl:print("No topics.~n"); + _ -> dump(?EXCLUSIVE_TAB, exclusive) + end; +exclusive(["delete", Topic0]) -> + Topic = erlang:iolist_to_binary(Topic0), + emqx_exclusive_subscription:unsubscribe(Topic, #{is_exclusive => true}), + emqx_ctl:print("ok~n"); +exclusive(_) -> + emqx_ctl:usage([ + {"exclusive list", "List all exclusive topics"}, + {"exclusive delete ", "Delete an exclusive topic"} + ]). diff --git a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl index f85fdbe5b..16871c129 100644 --- a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl @@ -360,4 +360,9 @@ t_autocluster_leave(Config) -> ) ). +t_exclusive(_Config) -> + emqx_ctl:run_command(["exclusive", "list"]), + emqx_ctl:run_command(["exclusive", "delete", "t/1"]), + ok. + format(Str, Opts) -> io:format("str:~s: Opts:~p", [Str, Opts]). diff --git a/apps/emqx_modules/src/emqx_modules.app.src b/apps/emqx_modules/src/emqx_modules.app.src index 19caf6763..a25bcd7ca 100644 --- a/apps/emqx_modules/src/emqx_modules.app.src +++ b/apps/emqx_modules/src/emqx_modules.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_modules, [ {description, "EMQX Modules"}, - {vsn, "5.0.27"}, + {vsn, "5.0.28"}, {modules, []}, {applications, [kernel, stdlib, emqx, emqx_ctl, observer_cli]}, {mod, {emqx_modules_app, []}}, diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src index c6cfae12b..dfdcf0fcd 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src @@ -1,6 +1,6 @@ {application, emqx_node_rebalance, [ {description, "EMQX Node Rebalance"}, - {vsn, "5.0.9"}, + {vsn, "5.0.10"}, {registered, [ emqx_node_rebalance_sup, emqx_node_rebalance, diff --git a/apps/emqx_plugins/src/emqx_plugins.app.src b/apps/emqx_plugins/src/emqx_plugins.app.src index 9c4cec6f9..7ee2eb540 100644 --- a/apps/emqx_plugins/src/emqx_plugins.app.src +++ b/apps/emqx_plugins/src/emqx_plugins.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_plugins, [ {description, "EMQX Plugin Management"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {modules, []}, {mod, {emqx_plugins_app, []}}, {applications, [kernel, stdlib, emqx, erlavro]}, diff --git a/apps/emqx_prometheus/src/emqx_prometheus.app.src b/apps/emqx_prometheus/src/emqx_prometheus.app.src index e5bb770cd..3e525657c 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.app.src +++ b/apps/emqx_prometheus/src/emqx_prometheus.app.src @@ -2,7 +2,7 @@ {application, emqx_prometheus, [ {description, "Prometheus for EMQX"}, % strict semver, bump manually! - {vsn, "5.2.3"}, + {vsn, "5.2.4"}, {modules, []}, {registered, [emqx_prometheus_sup]}, {applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_resource, emqx_management]}, diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 6e35949a9..54418adcd 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.32"}, + {vsn, "0.1.33"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 9c1b398ff..e37917215 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -198,6 +198,9 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) -> QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1), case simple_async_query(Id, Request, QueryOpts) of {error, _} = Error -> + ?tp("resource_simple_sync_internal_buffer_query_error", #{ + id => Id, request => Request + }), Error; {async_return, {error, _} = Error} -> Error; @@ -210,7 +213,11 @@ simple_sync_internal_buffer_query(Id, Request, QueryOpts0) -> receive {ReplyAlias, Response} -> Response - after 0 -> {error, timeout} + after 0 -> + ?tp("resource_simple_sync_internal_buffer_query_timeout", #{ + id => Id, request => Request + }), + {error, timeout} end end end @@ -1324,6 +1331,7 @@ do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Res ?tp(simple_query_override, #{query_mode => ReqQM}), #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, CallMode = call_mode(QM, CBM), + ?tp(simple_query_enter, #{}), apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer @@ -1331,6 +1339,7 @@ do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Res %% The connector supports buffer, send even in disconnected state #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource, CallMode = call_mode(QM, CBM), + ?tp(simple_query_enter, #{}), apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts); do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) -> %% when calling from the buffer worker or other simple queries, @@ -2327,6 +2336,7 @@ reply_call(Alias, Response) -> %% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to' %% callbacks. reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) -> + ?tp("reply_call_internal_buffer", #{}), ?MODULE:reply_call(ReplyAlias, Response), do_reply_caller(MaybeReplyTo, Response). diff --git a/apps/emqx_utils/src/emqx_utils.app.src b/apps/emqx_utils/src/emqx_utils.app.src index b2ec221e3..9c3f5eed4 100644 --- a/apps/emqx_utils/src/emqx_utils.app.src +++ b/apps/emqx_utils/src/emqx_utils.app.src @@ -2,7 +2,7 @@ {application, emqx_utils, [ {description, "Miscellaneous utilities for EMQX apps"}, % strict semver, bump manually! - {vsn, "5.2.3"}, + {vsn, "5.2.4"}, {modules, [ emqx_utils, emqx_utils_api, diff --git a/changes/ce/feat-13524.en.md b/changes/ce/feat-13524.en.md new file mode 100644 index 000000000..efe21104f --- /dev/null +++ b/changes/ce/feat-13524.en.md @@ -0,0 +1 @@ +Added CLI interface `emqx ctl exclusive` for the feature exclusive topics. diff --git a/changes/ce/feat-13534.en.md b/changes/ce/feat-13534.en.md new file mode 100644 index 000000000..5c5af0bf5 --- /dev/null +++ b/changes/ce/feat-13534.en.md @@ -0,0 +1 @@ +Add trace logging when superuser skipped authz check. diff --git a/changes/ce/fix-13425.en.md b/changes/ce/fix-13425.en.md new file mode 100644 index 000000000..e02e99c0a --- /dev/null +++ b/changes/ce/fix-13425.en.md @@ -0,0 +1 @@ +The MQTT connector error log messages have been improved to provide clearer and more detailed information. diff --git a/changes/ee/feat-13546.en.md b/changes/ee/feat-13546.en.md new file mode 100644 index 000000000..c403409ac --- /dev/null +++ b/changes/ee/feat-13546.en.md @@ -0,0 +1 @@ +Added the option to configure the query mode for Pulsar Producer action. diff --git a/deploy/charts/emqx-enterprise/Chart.yaml b/deploy/charts/emqx-enterprise/Chart.yaml index 5d19cf95a..3c25b9ebc 100644 --- a/deploy/charts/emqx-enterprise/Chart.yaml +++ b/deploy/charts/emqx-enterprise/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.7.2 +version: 5.8.0-alpha.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.7.2 +appVersion: 5.8.0-alpha.1 diff --git a/deploy/charts/emqx/Chart.yaml b/deploy/charts/emqx/Chart.yaml index 2948db061..49a3f7c84 100644 --- a/deploy/charts/emqx/Chart.yaml +++ b/deploy/charts/emqx/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.7.2 +version: 5.8.0-alpha.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.7.2 +appVersion: 5.8.0-alpha.1 diff --git a/mix.exs b/mix.exs index 96bb32632..d292e0069 100644 --- a/mix.exs +++ b/mix.exs @@ -182,7 +182,7 @@ defmodule EMQXUmbrella.MixProject do end def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true} - def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.3", override: true} + def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.12.0", override: true} def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true} def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.2", override: true} def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true} diff --git a/rebar.config b/rebar.config index 68b2df1d7..ad70d128d 100644 --- a/rebar.config +++ b/rebar.config @@ -82,7 +82,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.12.0"}}}, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, diff --git a/scripts/rel/cut.sh b/scripts/rel/cut.sh index 1affd48bf..8c8899b91 100755 --- a/scripts/rel/cut.sh +++ b/scripts/rel/cut.sh @@ -135,6 +135,12 @@ rel_branch() { e5.7.*) echo 'release-57' ;; + v5.8.*) + echo 'release-58' + ;; + e5.8.*) + echo 'release-58' + ;; *) logerr "Unsupported version tag $TAG" exit 1 diff --git a/scripts/rel/sync-remotes.sh b/scripts/rel/sync-remotes.sh index 430021a79..c986535ce 100755 --- a/scripts/rel/sync-remotes.sh +++ b/scripts/rel/sync-remotes.sh @@ -5,7 +5,7 @@ set -euo pipefail # ensure dir cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.." -BASE_BRANCHES=( 'release-57' 'release-56' 'release-55' 'master' ) +BASE_BRANCHES=( 'release-58' 'release-57' 'release-56' 'release-55' 'master' ) usage() { cat <