diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 97fc51363..46fb16ec9 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,7 +32,7 @@ %% `apps/emqx/src/bpapi/README.md' %% Opensource edition --define(EMQX_RELEASE_CE, "5.7.0"). +-define(EMQX_RELEASE_CE, "5.7.1-alpha.1"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.7.0"). +-define(EMQX_RELEASE_EE, "5.7.1-alpha.1"). diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 0b2e9277a..f9c161e0f 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -164,7 +164,7 @@ -type root_cfg_key() :: ?ROOT_KEY_ACTIONS | ?ROOT_KEY_SOURCES. --export_type([root_cfg_key/0]). +-export_type([root_cfg_key/0, bridge_v2_type/0, bridge_v2_name/0]). %%==================================================================== diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src index badddb20f..8c3223e8b 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_greptimedb, [ {description, "EMQX GreptimeDB Bridge"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index 1cd808e46..be52f4469 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -107,6 +107,10 @@ on_start(InstId, Config) -> %% See: greptimedb:start_client/1 start_client(InstId, Config). +on_stop(InstId, #{client := Client}) -> + Res = greptimedb:stop_client(Client), + ?tp(greptimedb_client_stopped, #{instance_id => InstId}), + Res; on_stop(InstId, _State) -> case emqx_resource:get_allocated_resources(InstId) of #{?greptime_client := Client} -> diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl index 96cf0d7c9..79ad2f999 100644 --- a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl @@ -50,18 +50,15 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - delete_all_bridges(), - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_connector_test_helpers:stop_apps([ - emqx_conf, emqx_bridge, emqx_resource, emqx_rule_engine - ]), - _ = application:stop(emqx_connector), ok. init_per_group(GreptimedbType, Config0) when GreptimedbType =:= grpcv1_tcp; GreptimedbType =:= grpcv1_tls -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), #{ host := GreptimedbHost, port := GreptimedbPort, @@ -89,13 +86,18 @@ init_per_group(GreptimedbType, Config0) when end, case emqx_common_test_helpers:is_tcp_server_available(GreptimedbHost, GreptimedbHttpPort) of true -> - ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), - ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - ok = start_apps(), - {ok, _} = application:ensure_all_started(emqx_connector), - {ok, _} = application:ensure_all_started(greptimedb), - emqx_mgmt_api_test_util:init_suite(), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_bridge_greptimedb, + emqx_bridge, + emqx_rule_engine, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config0)} + ), Config = [{use_tls, UseTLS} | Config0], {Name, ConfigString, GreptimedbConfig} = greptimedb_config( grpcv1, GreptimedbHost, GreptimedbPort, Config @@ -116,6 +118,7 @@ init_per_group(GreptimedbType, Config0) when ], {ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts), [ + {group_apps, Apps}, {proxy_host, ProxyHost}, {proxy_port, ProxyPort}, {proxy_name, ProxyName}, @@ -150,18 +153,21 @@ end_per_group(Group, Config) when Group =:= grpcv1_tcp; Group =:= grpcv1_tls -> + Apps = ?config(group_apps, Config), ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), EHttpcPoolName = ?config(ehttpc_pool_name, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), ehttpc_sup:stop_pool(EHttpcPoolName), - delete_bridge(Config), - _ = application:stop(greptimedb), + emqx_cth_suite:stop(Apps), ok; end_per_group(_Group, _Config) -> ok. init_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), delete_all_rules(), delete_all_bridges(), Config. @@ -179,14 +185,6 @@ end_per_testcase(_Testcase, Config) -> %% Helper fns %%------------------------------------------------------------------------------ -start_apps() -> - %% some configs in emqx_conf app are mandatory - %% we want to make sure they are loaded before - %% ekka start in emqx_common_test_helpers:start_apps/1 - emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]). - example_write_syntax() -> %% N.B.: this single space character is relevant <<"${topic},clientid=${clientid}", " ", "payload=${payload},", @@ -215,6 +213,7 @@ greptimedb_config(grpcv1 = Type, GreptimedbHost, GreptimedbPort, Config) -> " request_ttl = 1s\n" " query_mode = ~s\n" " batch_size = ~b\n" + " health_check_interval = 5s\n" " }\n" " ssl {\n" " enable = ~p\n" @@ -259,6 +258,7 @@ delete_bridge(Config) -> emqx_bridge:remove(Type, Name). delete_all_bridges() -> + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), lists:foreach( fun(#{name := Name, type := Type}) -> emqx_bridge:remove(Type, Name) @@ -692,6 +692,12 @@ t_boolean_variants(Config) -> {ok, _}, create_bridge(Config) ), + ResourceId = resource_id(Config), + ?retry( + _Sleep1 = 1_000, + _Attempts1 = 10, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), BoolVariants = #{ true => true, false => false, @@ -728,14 +734,22 @@ t_boolean_variants(Config) -> async -> ct:sleep(500); sync -> ok end, - PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config), - Expected = #{ - bool => Translation, - int_value => -123, - uint_value => 123, - payload => emqx_utils_json:encode(Payload) - }, - assert_persisted_data(ClientId, Expected, PersistedData), + ?retry( + _Sleep2 = 500, + _Attempts2 = 20, + begin + PersistedData = query_by_clientid( + atom_to_binary(?FUNCTION_NAME), ClientId, Config + ), + Expected = #{ + bool => Translation, + int_value => -123, + uint_value => 123, + payload => emqx_utils_json:encode(Payload) + }, + assert_persisted_data(ClientId, Expected, PersistedData) + end + ), ok end, BoolVariants @@ -841,6 +855,11 @@ t_get_status(Config) -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)) end), + ?retry( + _Sleep = 1_000, + _Attempts = 10, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), ok. t_create_disconnected(Config) -> @@ -859,6 +878,12 @@ t_create_disconnected(Config) -> ok end ), + ResourceId = resource_id(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 10, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), ok. t_start_error(Config) -> diff --git a/apps/emqx_conf/src/emqx_conf_schema_inject.erl b/apps/emqx_conf/src/emqx_conf_schema_inject.erl index 5dcba7b5e..0e0f36401 100644 --- a/apps/emqx_conf/src/emqx_conf_schema_inject.erl +++ b/apps/emqx_conf/src/emqx_conf_schema_inject.erl @@ -26,7 +26,7 @@ schemas(Edition) -> cluster_linking(Edition) ++ authn(Edition) ++ authz() ++ - customized(). + customized(Edition). auth_ext(ce) -> []; @@ -73,5 +73,5 @@ authz_mods() -> ]. %% Add more schemas here. -customized() -> +customized(_Edition) -> []. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index c042054e3..c3b746d8e 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -809,17 +809,18 @@ maybe_stop_resource(#data{status = Status} = Data) when Status =/= ?rm_status_st maybe_stop_resource(#data{status = ?rm_status_stopped} = Data) -> Data. -stop_resource(#data{state = ResState, id = ResId} = Data) -> +stop_resource(#data{id = ResId} = Data) -> %% We don't care about the return value of `Mod:on_stop/2'. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. HasAllocatedResources = emqx_resource:has_allocated_resources(ResId), %% Before stop is called we remove all the channels from the resource NewData = remove_channels(Data), - case ResState =/= undefined orelse HasAllocatedResources of + NewResState = NewData#data.state, + case NewResState =/= undefined orelse HasAllocatedResources of true -> %% we clear the allocated resources after stop is successful - emqx_resource:call_stop(NewData#data.id, NewData#data.mod, ResState); + emqx_resource:call_stop(NewData#data.id, NewData#data.mod, NewResState); false -> ok end, diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 51165e18f..7d0000a1c 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -42,7 +42,9 @@ func := builtin_action_func() | atom(), args => action_fun_args() } - | bridge_channel_id(). + | bridge_channel_id() + | {bridge_v2, emqx_bridge_v2:bridge_v2_type(), emqx_bridge_v2:bridge_v2_name()} + | {bridge, emqx_utils_maps:config_key(), emqx_utils_maps:config_key(), bridge_channel_id()}. -type rule() :: #{ diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl index 63b6fb65c..5afef1ca4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl @@ -95,6 +95,18 @@ format_action(BridgeChannelId) when is_binary(BridgeChannelId) -> io_lib:format("- Name: ~s\n" " Type: data-bridge\n" ,[BridgeChannelId] + ); +format_action({bridge, ActionType, ActionName, _Id}) -> + io_lib:format("- Name: ~p\n" + " Action Type: ~p\n" + " Type: data-bridge\n" + ,[ActionName, ActionType] + ); +format_action({bridge_v2, ActionType, ActionName}) -> + io_lib:format("- Name: ~p\n" + " Action Type: ~p\n" + " Type: data-bridge\n" + ,[ActionName, ActionType] ). left_pad(Str) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 395883e7b..680aac759 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -44,7 +44,8 @@ all() -> {group, metrics_simple}, {group, metrics_fail}, {group, metrics_fail_simple}, - {group, tracing} + {group, tracing}, + {group, command_line} ]. suite() -> @@ -147,6 +148,9 @@ groups() -> ]}, {tracing, [], [ t_trace_rule_id + ]}, + {command_line, [], [ + t_command_line_list_print_rule ]} ]. @@ -596,6 +600,26 @@ t_get_rule_ids_by_action(_) -> ?assertEqual([], emqx_rule_engine:get_rule_ids_by_action(<<"mysql:not_exists">>)), ok = delete_rules_by_ids([<<"t_get_rule_ids_by_action">>]). +%% Check that command line interface don't crash when listing and showing rules +t_command_line_list_print_rule(_) -> + ID = <<"t_command_line">>, + Rule1 = #{ + id => ID, + sql => <<"SELECT * FROM \"t\"">>, + actions => [ + #{function => console, args => #{}}, + #{function => republish, args => #{}}, + <<"mqtt:my_mqtt_bridge">>, + <<"mysql:foo">> + ], + description => ID, + created_at => erlang:system_time(millisecond) + }, + ok = create_rules([Rule1]), + ok = emqx_rule_engine_cli:cmd(["list"]), + ok = emqx_rule_engine_cli:cmd(["show", binary_to_list(ID)]), + ok = delete_rules_by_ids([ID]). + t_ensure_action_removed(_) -> Id = <<"t_ensure_action_removed">>, GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>, diff --git a/changes/ce/fix-13290.en.md b/changes/ce/fix-13290.en.md new file mode 100644 index 000000000..cbc47c9de --- /dev/null +++ b/changes/ce/fix-13290.en.md @@ -0,0 +1,3 @@ +The following command previously printed nothing if the rule had a data bridge action attached to it. This is now fixed. + + $ bin/emqx ctl rules show rule_0hyd diff --git a/deploy/charts/emqx/Chart.yaml b/deploy/charts/emqx/Chart.yaml index 80e7d3c0a..0bc96e822 100644 --- a/deploy/charts/emqx/Chart.yaml +++ b/deploy/charts/emqx/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.7.0 +version: 5.7.1-alpha.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.7.0 +appVersion: 5.7.1-alpha.1 diff --git a/scripts/test/start-two-nodes-in-docker.sh b/scripts/test/start-two-nodes-in-docker.sh index b3a20982a..c2cc30d5e 100755 --- a/scripts/test/start-two-nodes-in-docker.sh +++ b/scripts/test/start-two-nodes-in-docker.sh @@ -177,6 +177,7 @@ backend emqx_wss_back server emqx-2 $NODE2:8083 check-send-proxy send-proxy-v2-ssl-cn EOF +HAPROXY_IMAGE='ghcr.io/haproxytech/haproxy-docker-alpine:2.4.27' haproxy_cid=$(docker run -d --name haproxy \ --net "$NET" \ @@ -184,8 +185,8 @@ haproxy_cid=$(docker run -d --name haproxy \ -v "$(pwd)/apps/emqx/etc/certs:/usr/local/etc/haproxy/certs" \ -w /usr/local/etc/haproxy \ "${HAPROXY_PORTS[@]}" \ - "public.ecr.aws/docker/library/haproxy:2.4" \ - bash -c 'set -euo pipefail; + "${HAPROXY_IMAGE}" \ + sh -c 'set -euo pipefail; cat certs/cert.pem certs/key.pem > /tmp/emqx.pem; haproxy -f haproxy.cfg')