diff --git a/Makefile b/Makefile index d4a3d1a6d..d6443ebf2 100644 --- a/Makefile +++ b/Makefile @@ -296,7 +296,7 @@ $(foreach tt,$(ALL_ELIXIR_TGZS),$(eval $(call gen-elixir-tgz-target,$(tt)))) .PHONY: fmt fmt: $(REBAR) - @$(SCRIPTS)/erlfmt -w '{apps,lib-ee}/*/{src,include,test}/**/*.{erl,hrl,app.src}' + @$(SCRIPTS)/erlfmt -w '{apps,lib-ee}/*/{src,include,priv,test}/**/*.{erl,hrl,app.src,eterm}' @$(SCRIPTS)/erlfmt -w 'rebar.config.erl' @mix format diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 2412de99e..09b435e6e 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,7 +32,7 @@ %% `apps/emqx/src/bpapi/README.md' %% Opensource edition --define(EMQX_RELEASE_CE, "5.1.1"). +-define(EMQX_RELEASE_CE, "5.1.2"). %% Enterprise edition -define(EMQX_RELEASE_EE, "5.1.1-alpha.2"). diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 2d1ba49eb..e13f60654 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -12,6 +12,7 @@ {emqx_cm,2}. {emqx_conf,1}. {emqx_conf,2}. +{emqx_conf,3}. {emqx_dashboard,1}. {emqx_delayed,1}. {emqx_eviction_agent,1}. diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index a0b085adc..6962ec4cc 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -28,9 +28,9 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.6"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.7"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, - {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.13"}}}, + {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 01c01c2be..5cab3cbc5 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1844,9 +1844,7 @@ desc("stats") -> desc("authorization") -> "Settings for client authorization."; desc("mqtt") -> - "Global MQTT configuration.
" - "The configs here work as default values which can be overridden\n" - "in zone configs"; + "Global MQTT configuration."; desc("authz_cache") -> "Settings for the authorization cache."; desc("zone") -> diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src index f449588cc..de790ab46 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, ecql ]}, {env, []}, diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src index cfb08f47b..a10361c51 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, clickhouse ]}, {env, []}, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index 824f5ee7b..ed5078432 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, erlcloud ]}, {env, []}, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index 10722e6ce..53d36361c 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge_http, ehttpc ]}, {env, []}, diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index 9cfe88b7e..7d50304b1 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -598,6 +598,7 @@ start_cluster(Cluster) -> end, Cluster ), + NumNodes = length(Nodes), on_exit(fun() -> emqx_utils:pmap( fun(N) -> @@ -607,6 +608,11 @@ start_cluster(Cluster) -> Nodes ) end), + {ok, _} = snabbkaffe:block_until( + %% -1 because only those that join the first node will emit the event. + ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}), + 30_000 + ), Nodes. wait_for_cluster_rpc(Node) -> diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src index 0549dd020..2a800baca 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, hstreamdb_erl ]}, {env, []}, diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index 1e2ebbfb1..1763b252a 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -309,6 +309,13 @@ do_append_records(false, Producer, Record) -> msg => "HStreamDB producer sync append success", record => Record }); + %% the HStream is warming up or buzy, something are not ready yet, retry after a while + {error, {unavailable, _} = Reason} -> + {error, + {recoverable_error, #{ + msg => "HStreamDB is warming up or buzy, will retry after a moment", + reason => Reason + }}}; {error, Reason} = Err -> ?tp( hstreamdb_connector_query_return, diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src index f8097dec2..859f80f53 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src +++ b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src @@ -2,7 +2,7 @@ {description, "EMQX HTTP Bridge and Connector Application"}, {vsn, "0.1.1"}, {registered, []}, - {applications, [kernel, stdlib, emqx_connector, emqx_resource, emqx_bridge, ehttpc]}, + {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src index 71b95a40d..a612c225b 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, influxdb ]}, {env, []}, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index d26cbe873..b79c4c2ce 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -11,7 +11,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge_http, %% for module emqx_connector_http emqx_connector ]}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 87c1841e5..997b768b3 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -7,7 +7,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, telemetry, wolff, brod, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 74fde6426..f1f2ce362 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -1071,13 +1071,14 @@ cluster(Config) -> Cluster = emqx_common_test_helpers:emqx_cluster( [core, core], [ - {apps, [emqx_conf, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]}, + {apps, [emqx_conf, emqx_rule_engine, emqx_bridge_kafka, emqx_bridge]}, {listener_ports, []}, {peer_mod, PeerModule}, {priv_data_dir, PrivDataDir}, {load_schema, true}, {start_autocluster, true}, {schema_mod, emqx_enterprise_schema}, + {load_apps, [emqx_machine]}, {env_handler, fun (emqx) -> application:set_env(emqx, boot_modules, [broker, router]), @@ -1901,6 +1902,7 @@ t_cluster_node_down(Config) -> ?check_trace( begin {_N2, Opts2} = lists:nth(2, Cluster), + NumNodes = length(Cluster), Nodes = [N1, N2 | _] = lists:map( @@ -1925,6 +1927,11 @@ t_cluster_node_down(Config) -> 15_000 ), wait_for_cluster_rpc(N2), + {ok, _} = snabbkaffe:block_until( + %% -1 because only those that join the first node will emit the event. + ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}), + 30_000 + ), erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end), {ok, _} = snabbkaffe:receive_events(SRef0), lists:foreach( @@ -1980,7 +1987,7 @@ t_cluster_node_down(Config) -> ?assertEqual(NPartitions, map_size(Assignments)), NumPublished = ets:info(TId, size), %% All published messages are eventually received. - Published = receive_published(#{n => NumPublished, timeout => 3_000}), + Published = receive_published(#{n => NumPublished, timeout => 10_000}), ct:pal("published:\n ~p", [Published]), ok end diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src index 42129bfc7..14aca1f75 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src @@ -5,8 +5,7 @@ {applications, [ kernel, stdlib, - emqx_resource, - emqx_bridge + emqx_resource ]}, {env, []}, {modules, []}, diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src index fa3ebd3c9..35bcc3fc4 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src @@ -7,7 +7,6 @@ stdlib, emqx_connector, emqx_resource, - emqx_bridge, emqx_mongodb ]}, {env, []}, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 853e98eb5..052b271f8 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,14 +1,13 @@ %% -*- mode: erlang -*- {application, emqx_bridge_mqtt, [ {description, "EMQX MQTT Broker Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [ kernel, stdlib, emqx, emqx_resource, - emqx_bridge, emqtt ]}, {env, []}, diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src index 2ecdd6a6a..252b8ff00 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src @@ -7,7 +7,6 @@ stdlib, emqx_connector, emqx_resource, - emqx_bridge, emqx_mysql ]}, {env, []}, diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src index 6ec938afd..b106bb844 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, opentsdb ]}, {env, []}, diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src index a05533da3..4f46ce464 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, emqx_oracle ]}, {env, []}, diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 5c6eddb39..43eb22903 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -162,6 +162,9 @@ delete_all_bridges() -> sql_insert_template_for_bridge() -> "INSERT INTO mqtt_test(topic, msgid, payload, retain) VALUES (${topic}, ${id}, ${payload}, ${retain})". +sql_insert_template_with_nested_token_for_bridge() -> + "INSERT INTO mqtt_test(topic, msgid, payload, retain) VALUES (${topic}, ${id}, ${payload.msg}, ${retain})". + sql_create_table() -> "CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))". @@ -533,6 +536,23 @@ t_start_stop(Config) -> ), ok. +t_probe_with_nested_tokens(Config) -> + ResourceId = resource_id(Config), + reset_table(Config), + ?assertMatch( + {ok, _}, + create_bridge(Config, #{ + <<"sql">> => sql_insert_template_with_nested_token_for_bridge() + }) + ), + %% Since the connection process is async, we give it some time to + %% stabilize and avoid flakiness. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ). + t_on_get_status(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src index ade791a6d..85131baf0 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src @@ -5,8 +5,7 @@ {applications, [ kernel, stdlib, - emqx_resource, - emqx_bridge + emqx_resource ]}, {env, []}, {modules, []}, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index 99fb25c33..ed468a833 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, pulsar ]}, {env, []}, diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 15d4b63d4..4f0f73732 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -547,6 +547,7 @@ start_cluster(Cluster) -> emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster ], + NumNodes = length(Nodes), on_exit(fun() -> emqx_utils:pmap( fun(N) -> @@ -556,6 +557,11 @@ start_cluster(Cluster) -> Nodes ) end), + {ok, _} = snabbkaffe:block_until( + %% -1 because only those that join the first node will emit the event. + ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}), + 30_000 + ), Nodes. kill_resource_managers() -> diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src index e9ef4d524..c6c0c3897 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src @@ -6,8 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, - ecql, rabbit_common, amqp_client ]}, diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src index bc21adcad..b380bc86d 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src @@ -7,7 +7,6 @@ stdlib, emqx_connector, emqx_resource, - emqx_bridge, emqx_redis ]}, {env, []}, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src index e18b98e3a..e158a2e46 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src @@ -2,7 +2,7 @@ {description, "EMQX Enterprise RocketMQ Bridge"}, {vsn, "0.1.3"}, {registered, []}, - {applications, [kernel, stdlib, emqx_resource, emqx_bridge, rocketmq]}, + {applications, [kernel, stdlib, emqx_resource, rocketmq]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index 35f4587b0..3aa8b3b68 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -2,7 +2,7 @@ {description, "EMQX Enterprise SQL Server Bridge"}, {vsn, "0.1.2"}, {registered, []}, - {applications, [kernel, stdlib, emqx_resource, emqx_bridge, odbc]}, + {applications, [kernel, stdlib, emqx_resource, odbc]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src index e4c946162..be57fa880 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src @@ -6,7 +6,6 @@ kernel, stdlib, emqx_resource, - emqx_bridge, tdengine ]}, {env, []}, diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src index 7a4aeeb56..adb024591 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src @@ -2,7 +2,7 @@ {description, "EMQX Enterprise TimescaleDB Bridge"}, {vsn, "0.1.2"}, {registered, []}, - {applications, [kernel, stdlib, emqx_resource, emqx_bridge]}, + {applications, [kernel, stdlib, emqx_resource]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index d5f39bcb0..1efeb4d69 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -71,7 +71,7 @@ get_raw(KeyPath) -> %% @doc Returns all values in the cluster. -spec get_all(emqx_utils_maps:config_key_path()) -> #{node() => term()}. get_all(KeyPath) -> - {ResL, []} = emqx_conf_proto_v2:get_all(KeyPath), + {ResL, []} = emqx_conf_proto_v3:get_all(KeyPath), maps:from_list(ResL). %% @doc Returns the specified node's KeyPath, or exception if not found @@ -79,14 +79,14 @@ get_all(KeyPath) -> get_by_node(Node, KeyPath) when Node =:= node() -> emqx:get_config(KeyPath); get_by_node(Node, KeyPath) -> - emqx_conf_proto_v2:get_config(Node, KeyPath). + emqx_conf_proto_v3:get_config(Node, KeyPath). %% @doc Returns the specified node's KeyPath, or the default value if not found -spec get_by_node(node(), emqx_utils_maps:config_key_path(), term()) -> term(). get_by_node(Node, KeyPath, Default) when Node =:= node() -> emqx:get_config(KeyPath, Default); get_by_node(Node, KeyPath, Default) -> - emqx_conf_proto_v2:get_config(Node, KeyPath, Default). + emqx_conf_proto_v3:get_config(Node, KeyPath, Default). %% @doc Returns the specified node's KeyPath, or config_not_found if key path not found -spec get_node_and_config(emqx_utils_maps:config_key_path()) -> term(). @@ -101,7 +101,7 @@ get_node_and_config(KeyPath) -> ) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update(KeyPath, UpdateReq, Opts) -> - emqx_conf_proto_v2:update(KeyPath, UpdateReq, Opts). + emqx_conf_proto_v3:update(KeyPath, UpdateReq, Opts). %% @doc Update the specified node's key path in local-override.conf. -spec update( @@ -114,7 +114,7 @@ update(KeyPath, UpdateReq, Opts) -> update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() -> emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local}); update(Node, KeyPath, UpdateReq, Opts) -> - emqx_conf_proto_v2:update(Node, KeyPath, UpdateReq, Opts). + emqx_conf_proto_v3:update(Node, KeyPath, UpdateReq, Opts). %% @doc Mark the specified key path as tombstone tombstone(KeyPath, Opts) -> @@ -124,7 +124,7 @@ tombstone(KeyPath, Opts) -> -spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. remove(KeyPath, Opts) -> - emqx_conf_proto_v2:remove_config(KeyPath, Opts). + emqx_conf_proto_v3:remove_config(KeyPath, Opts). %% @doc remove the specified node's key path in local-override.conf. -spec remove(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> @@ -132,13 +132,13 @@ remove(KeyPath, Opts) -> remove(Node, KeyPath, Opts) when Node =:= node() -> emqx:remove_config(KeyPath, Opts#{override_to => local}); remove(Node, KeyPath, Opts) -> - emqx_conf_proto_v2:remove_config(Node, KeyPath, Opts). + emqx_conf_proto_v3:remove_config(Node, KeyPath, Opts). %% @doc reset all value of key path in cluster-override.conf or local-override.conf. -spec reset(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. reset(KeyPath, Opts) -> - emqx_conf_proto_v2:reset(KeyPath, Opts). + emqx_conf_proto_v3:reset(KeyPath, Opts). %% @doc reset the specified node's key path in local-override.conf. -spec reset(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> @@ -146,7 +146,7 @@ reset(KeyPath, Opts) -> reset(Node, KeyPath, Opts) when Node =:= node() -> emqx:reset_config(KeyPath, Opts#{override_to => local}); reset(Node, KeyPath, Opts) -> - emqx_conf_proto_v2:reset(Node, KeyPath, Opts). + emqx_conf_proto_v3:reset(Node, KeyPath, Opts). %% @doc Called from build script. %% TODO: move to a external escript after all refactoring is done diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 0a486c829..7addb3823 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -137,7 +137,7 @@ sync_cluster_conf() -> %% @private Some core nodes are running, try to sync the cluster config from them. sync_cluster_conf2(Nodes) -> - {Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes), + {Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes), {Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0), case (Failed =/= [] orelse NotReady =/= []) of @@ -284,7 +284,7 @@ conf_sort({ok, _}, {ok, _}) -> false. sync_data_from_node(Node) -> - case emqx_conf_proto_v2:sync_data_from_node(Node) of + case emqx_conf_proto_v3:sync_data_from_node(Node) of {ok, DataBin} -> case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of {ok, []} -> diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index b0a1a414d..fde3059d3 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -213,10 +213,20 @@ load_config(Bin, ReplaceOrMerge) when is_binary(Bin) -> load_config_from_raw(RawConf, ReplaceOrMerge) -> case check_config(RawConf) of ok -> - lists:foreach( - fun({K, V}) -> update_config_cluster(K, V, ReplaceOrMerge) end, - to_sorted_list(RawConf) - ); + Error = + lists:filtermap( + fun({K, V}) -> + case update_config_cluster(K, V, ReplaceOrMerge) of + ok -> false; + {error, Msg} -> {true, Msg} + end + end, + to_sorted_list(RawConf) + ), + case iolist_to_binary(Error) of + <<"">> -> ok; + ErrorBin -> {error, ErrorBin} + end; {error, ?UPDATE_READONLY_KEYS_PROHIBITED = Reason} -> emqx_ctl:warning("load config failed~n~ts~n", [Reason]), emqx_ctl:warning( @@ -234,34 +244,63 @@ load_config_from_raw(RawConf, ReplaceOrMerge) -> {error, Errors} end. -update_config_cluster(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge) -> - check_res(Key, emqx_authz:merge(Conf)); -update_config_cluster(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge) -> - check_res(Key, emqx_authn:merge_config(Conf)); -update_config_cluster(Key, NewConf, merge) -> +update_config_cluster(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge = Mode) -> + check_res(Key, emqx_authz:merge(Conf), Conf, Mode); +update_config_cluster(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge = Mode) -> + check_res(Key, emqx_authn:merge_config(Conf), Conf, Mode); +update_config_cluster(Key, NewConf, merge = Mode) -> Merged = merge_conf(Key, NewConf), - check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS)); -update_config_cluster(Key, Value, replace) -> - check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS)). + check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS), NewConf, Mode); +update_config_cluster(Key, Value, replace = Mode) -> + check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS), Value, Mode). -define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}). -update_config_local(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge) -> - check_res(node(), Key, emqx_authz:merge_local(Conf, ?LOCAL_OPTIONS)); -update_config_local(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge) -> - check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS)); -update_config_local(Key, NewConf, merge) -> +update_config_local(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge = Mode) -> + check_res(node(), Key, emqx_authz:merge_local(Conf, ?LOCAL_OPTIONS), Conf, Mode); +update_config_local(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge = Mode) -> + check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS), Conf, Mode); +update_config_local(Key, NewConf, merge = Mode) -> Merged = merge_conf(Key, NewConf), - check_res(node(), Key, emqx:update_config([Key], Merged, ?LOCAL_OPTIONS)); -update_config_local(Key, Value, replace) -> - check_res(node(), Key, emqx:update_config([Key], Value, ?LOCAL_OPTIONS)). + check_res(node(), Key, emqx:update_config([Key], Merged, ?LOCAL_OPTIONS), NewConf, Mode); +update_config_local(Key, Value, replace = Mode) -> + check_res(node(), Key, emqx:update_config([Key], Value, ?LOCAL_OPTIONS), Value, Mode). -check_res(Key, Res) -> check_res(cluster, Key, Res). -check_res(Mode, Key, {ok, _} = Res) -> - emqx_ctl:print("load ~ts in ~p ok~n", [Key, Mode]), - Res; -check_res(_Mode, Key, {error, Reason} = Res) -> - emqx_ctl:warning("load ~ts failed~n~p~n", [Key, Reason]), - Res. +check_res(Key, Res, Conf, Mode) -> check_res(cluster, Key, Res, Conf, Mode). +check_res(Node, Key, {ok, _}, _Conf, _Mode) -> + emqx_ctl:print("load ~ts on ~p ok~n", [Key, Node]), + ok; +check_res(_Node, Key, {error, Reason}, Conf, Mode) -> + Warning = + "Can't ~ts the new configurations!~n" + "Root key: ~ts~n" + "Reason: ~p~n", + emqx_ctl:warning(Warning, [Mode, Key, Reason]), + ActiveMsg0 = + "The effective configurations:~n" + "```~n" + "~ts```~n~n", + ActiveMsg = io_lib:format(ActiveMsg0, [hocon_pp:do(#{Key => emqx_conf:get_raw([Key])}, #{})]), + FailedMsg0 = + "Try to ~ts with:~n" + "```~n" + "~ts```~n", + FailedMsg = io_lib:format(FailedMsg0, [Mode, hocon_pp:do(#{Key => Conf}, #{})]), + SuggestMsg = suggest_msg(Mode), + Msg = iolist_to_binary([ActiveMsg, FailedMsg, SuggestMsg]), + emqx_ctl:print("~ts", [Msg]), + {error, iolist_to_binary([Warning, Msg])}. + +suggest_msg(Mode) when Mode == merge orelse Mode == replace -> + RetryMode = + case Mode of + merge -> "replace"; + replace -> "merge" + end, + io_lib:format( + "Tips: There may be some conflicts in the new configuration under `~ts` mode,~n" + "Please retry with the `~ts` mode.~n", + [Mode, RetryMode] + ). check_config(Conf) -> case check_keys_is_not_readonly(Conf) of @@ -349,7 +388,7 @@ filter_readonly_config(Raw) -> reload_config(AllConf, ReplaceOrMerge) -> Fold = fun({Key, Conf}, Acc) -> case update_config_local(Key, Conf, ReplaceOrMerge) of - {ok, _} -> + ok -> Acc; Error -> ?SLOG(error, #{ diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl new file mode 100644 index 000000000..a2719bc8e --- /dev/null +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl @@ -0,0 +1,119 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + sync_data_from_node/1, + get_config/2, + get_config/3, + get_all/1, + + update/3, + update/4, + remove_config/2, + remove_config/3, + + reset/2, + reset/3, + + get_override_config_file/1 +]). + +-export([get_hocon_config/1, get_hocon_config/2]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.1.1". + +-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). +sync_data_from_node(Node) -> + rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000). +-type update_config_key_path() :: [emqx_utils_maps:config_key(), ...]. + +-spec get_config(node(), emqx_utils_maps:config_key_path()) -> + term() | emqx_rpc:badrpc(). +get_config(Node, KeyPath) -> + rpc:call(Node, emqx, get_config, [KeyPath]). + +-spec get_config(node(), emqx_utils_maps:config_key_path(), _Default) -> + term() | emqx_rpc:badrpc(). +get_config(Node, KeyPath, Default) -> + rpc:call(Node, emqx, get_config, [KeyPath, Default]). + +-spec get_all(emqx_utils_maps:config_key_path()) -> emqx_rpc:multicall_result(). +get_all(KeyPath) -> + rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000). + +-spec update( + update_config_key_path(), + emqx_config:update_request(), + emqx_config:update_opts() +) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +update(KeyPath, UpdateReq, Opts) -> + emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]). + +-spec update( + node(), + update_config_key_path(), + emqx_config:update_request(), + emqx_config:update_opts() +) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +update(Node, KeyPath, UpdateReq, Opts) -> + rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000). + +-spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +remove_config(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]). + +-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +remove_config(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000). + +-spec reset(update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +reset(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]). + +-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +reset(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, reset_config, [KeyPath, Opts]). + +-spec get_override_config_file([node()]) -> emqx_rpc:multicall_result(). +get_override_config_file(Nodes) -> + rpc:multicall(Nodes, emqx_conf_app, get_override_config_file, [], 20000). + +-spec get_hocon_config(node()) -> map() | {badrpc, _}. +get_hocon_config(Node) -> + rpc:call(Node, emqx_conf_cli, get_config, []). + +-spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}. +get_hocon_config(Node, Key) -> + rpc:call(Node, emqx_conf_cli, get_config, [Key]). diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm new file mode 100644 index 000000000..500a47d8f --- /dev/null +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -0,0 +1,112 @@ +%% -*- mode: erlang; -*- +#{ + %% must always be of type `load' + db_apps => + [ + mnesia_rocksdb, + mnesia, + mria, + ekka + ], + system_apps => + [ + kernel, + sasl, + crypto, + public_key, + asn1, + syntax_tools, + ssl, + os_mon, + inets, + compiler, + runtime_tools, + redbug, + xmerl, + {hocon, load}, + telemetry + ], + %% must always be of type `load' + common_business_apps => + [ + emqx, + emqx_conf, + + esasl, + observer_cli, + tools, + covertool, + %% started by emqx_machine + system_monitor, + emqx_utils, + emqx_durable_storage, + emqx_http_lib, + emqx_resource, + emqx_connector, + emqx_authn, + emqx_authz, + emqx_auto_subscribe, + emqx_gateway, + emqx_gateway_stomp, + emqx_gateway_mqttsn, + emqx_gateway_coap, + emqx_gateway_lwm2m, + emqx_gateway_exproto, + emqx_exhook, + emqx_bridge, + emqx_bridge_mqtt, + emqx_bridge_http, + emqx_rule_engine, + emqx_modules, + emqx_management, + emqx_dashboard, + emqx_retainer, + emqx_prometheus, + emqx_psk, + emqx_slow_subs, + emqx_mongodb, + emqx_redis, + emqx_mysql, + emqx_plugins, + quicer, + bcrypt, + jq, + observer + ], + %% must always be of type `load' + ee_business_apps => + [ + emqx_license, + emqx_enterprise, + emqx_bridge_kafka, + emqx_bridge_pulsar, + emqx_bridge_gcp_pubsub, + emqx_bridge_cassandra, + emqx_bridge_opents, + emqx_bridge_clickhouse, + emqx_bridge_dynamo, + emqx_bridge_hstreamdb, + emqx_bridge_influxdb, + emqx_bridge_iotdb, + emqx_bridge_matrix, + emqx_bridge_mongodb, + emqx_bridge_mysql, + emqx_bridge_pgsql, + emqx_bridge_redis, + emqx_bridge_rocketmq, + emqx_bridge_tdengine, + emqx_bridge_timescale, + emqx_bridge_sqlserver, + emqx_bridge_kinesis, + emqx_oracle, + emqx_bridge_oracle, + emqx_bridge_rabbitmq, + emqx_schema_registry, + emqx_eviction_agent, + emqx_node_rebalance, + emqx_ft + ], + %% must always be of type `load' + ce_business_apps => + [emqx_telemetry] +}. diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index 5cef7ce71..82b909b4f 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -16,6 +16,7 @@ -module(emqx_machine_boot). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([post_boot/0]). -export([stop_apps/0, ensure_apps_started/0]). @@ -24,7 +25,6 @@ -export([stop_port_apps/0]). -dialyzer({no_match, [basic_reboot_apps/0]}). --dialyzer({no_match, [basic_reboot_apps_edition/1]}). -ifdef(TEST). -export([sorted_reboot_apps/1, reboot_apps/0]). @@ -94,7 +94,8 @@ stop_one_app(App) -> ensure_apps_started() -> ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}), - lists:foreach(fun start_one_app/1, sorted_reboot_apps()). + lists:foreach(fun start_one_app/1, sorted_reboot_apps()), + ?tp(emqx_machine_boot_apps_started, #{}). start_one_app(App) -> ?SLOG(debug, #{msg => "starting_app", app => App}), @@ -128,45 +129,39 @@ reboot_apps() -> BaseRebootApps ++ ConfigApps. basic_reboot_apps() -> - ?BASIC_REBOOT_APPS ++ - [ - emqx_prometheus, - emqx_modules, - emqx_dashboard, - emqx_connector, - emqx_gateway, - emqx_resource, - emqx_rule_engine, - emqx_bridge, - emqx_management, - emqx_retainer, - emqx_exhook, - emqx_authn, - emqx_authz, - emqx_slow_subs, - emqx_auto_subscribe, - emqx_plugins, - emqx_psk, - emqx_durable_storage - ] ++ basic_reboot_apps_edition(emqx_release:edition()). + PrivDir = code:priv_dir(emqx_machine), + RebootListPath = filename:join([PrivDir, "reboot_lists.eterm"]), + {ok, [ + #{ + common_business_apps := CommonBusinessApps, + ee_business_apps := EEBusinessApps, + ce_business_apps := CEBusinessApps + } + ]} = file:consult(RebootListPath), + EditionSpecificApps = + case emqx_release:edition() of + ee -> EEBusinessApps; + ce -> CEBusinessApps; + _ -> [] + end, + BusinessApps = CommonBusinessApps ++ EditionSpecificApps, + ?BASIC_REBOOT_APPS ++ (BusinessApps -- excluded_apps()). -basic_reboot_apps_edition(ce) -> - [emqx_telemetry]; -basic_reboot_apps_edition(ee) -> - [ - emqx_license, - emqx_s3, - emqx_ft, - emqx_eviction_agent, - emqx_node_rebalance, - emqx_schema_registry - ]; -%% unexcepted edition, should not happen -basic_reboot_apps_edition(_) -> - []. +excluded_apps() -> + OptionalApps = [bcrypt, jq, observer], + [system_monitor, observer_cli] ++ + [App || App <- OptionalApps, not is_app(App)]. + +is_app(Name) -> + case application:load(Name) of + ok -> true; + {error, {already_loaded, _}} -> true; + _ -> false + end. sorted_reboot_apps() -> - Apps = [{App, app_deps(App)} || App <- reboot_apps()], + Apps0 = [{App, app_deps(App)} || App <- reboot_apps()], + Apps = inject_bridge_deps(Apps0), sorted_reboot_apps(Apps). app_deps(App) -> @@ -175,6 +170,25 @@ app_deps(App) -> {ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List) end. +%% `emqx_bridge' is special in that it needs all the bridges apps to +%% be started before it, so that, when it loads the bridges from +%% configuration, the bridge app and its dependencies need to be up. +inject_bridge_deps(RebootAppDeps) -> + BridgeApps = [ + App + || {App, _Deps} <- RebootAppDeps, + lists:prefix("emqx_bridge_", atom_to_list(App)) + ], + lists:map( + fun + ({emqx_bridge, Deps0}) when is_list(Deps0) -> + {emqx_bridge, Deps0 ++ BridgeApps}; + (App) -> + App + end, + RebootAppDeps + ). + sorted_reboot_apps(Apps) -> G = digraph:new(), try diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 6ea2b5549..2f261c0d5 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -179,7 +179,7 @@ get_sys_memory() -> end. node_info(Nodes) -> - emqx_rpc:unwrap_erpc(emqx_management_proto_v3:node_info(Nodes)). + emqx_rpc:unwrap_erpc(emqx_management_proto_v4:node_info(Nodes)). stopped_node_info(Node) -> {Node, #{node => Node, node_status => 'stopped', role => core}}. @@ -223,7 +223,7 @@ convert_broker_info({K, V}, M) -> M#{K => iolist_to_binary(V)}. broker_info(Nodes) -> - emqx_rpc:unwrap_erpc(emqx_management_proto_v3:broker_info(Nodes)). + emqx_rpc:unwrap_erpc(emqx_management_proto_v4:broker_info(Nodes)). %%-------------------------------------------------------------------- %% Metrics and Stats @@ -446,7 +446,7 @@ do_call_client(ClientId, Req) -> %% @private call_client(Node, ClientId, Req) -> - unwrap_rpc(emqx_management_proto_v3:call_client(Node, ClientId, Req)). + unwrap_rpc(emqx_management_proto_v4:call_client(Node, ClientId, Req)). %%-------------------------------------------------------------------- %% Subscriptions @@ -459,7 +459,7 @@ do_list_subscriptions() -> throw(not_implemented). list_subscriptions(Node) -> - unwrap_rpc(emqx_management_proto_v3:list_subscriptions(Node)). + unwrap_rpc(emqx_management_proto_v4:list_subscriptions(Node)). list_subscriptions_via_topic(Topic, FormatFun) -> lists:append([ @@ -481,7 +481,7 @@ subscribe(ClientId, TopicTables) -> subscribe(emqx:running_nodes(), ClientId, TopicTables). subscribe([Node | Nodes], ClientId, TopicTables) -> - case unwrap_rpc(emqx_management_proto_v3:subscribe(Node, ClientId, TopicTables)) of + case unwrap_rpc(emqx_management_proto_v4:subscribe(Node, ClientId, TopicTables)) of {error, _} -> subscribe(Nodes, ClientId, TopicTables); {subscribe, Res} -> {subscribe, Res, Node} end; @@ -508,7 +508,7 @@ unsubscribe(ClientId, Topic) -> -spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) -> {unsubscribe, _} | {error, channel_not_found}. unsubscribe([Node | Nodes], ClientId, Topic) -> - case unwrap_rpc(emqx_management_proto_v3:unsubscribe(Node, ClientId, Topic)) of + case unwrap_rpc(emqx_management_proto_v4:unsubscribe(Node, ClientId, Topic)) of {error, _} -> unsubscribe(Nodes, ClientId, Topic); Re -> Re end; @@ -531,7 +531,7 @@ unsubscribe_batch(ClientId, Topics) -> -spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) -> {unsubscribe_batch, _} | {error, channel_not_found}. unsubscribe_batch([Node | Nodes], ClientId, Topics) -> - case unwrap_rpc(emqx_management_proto_v3:unsubscribe_batch(Node, ClientId, Topics)) of + case unwrap_rpc(emqx_management_proto_v4:unsubscribe_batch(Node, ClientId, Topics)) of {error, _} -> unsubscribe_batch(Nodes, ClientId, Topics); Re -> Re end; diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index f2e336d0f..5edf8c564 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -346,11 +346,10 @@ configs(get, #{query_string := QueryStr, headers := Headers}, _Req) -> configs(put, #{body := Conf, query_string := #{<<"mode">> := Mode}}, _Req) -> case emqx_conf_cli:load_config(Conf, Mode) of ok -> {200}; - {error, [{_, Reason}]} -> {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}}; - {error, Errors} -> {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Errors)}} + {error, Msg} -> {400, #{<<"content-type">> => <<"text/plain">>}, Msg} end. -find_suitable_accept(Headers, Perferences) when is_list(Perferences), length(Perferences) > 0 -> +find_suitable_accept(Headers, Preferences) when is_list(Preferences), length(Preferences) > 0 -> AcceptVal = maps:get(<<"accept">>, Headers, <<"*/*">>), %% Multiple types, weighted with the quality value syntax: %% Accept: text/html, application/xhtml+xml, application/xml;q=0.9, image/webp, */*;q=0.8 @@ -363,20 +362,27 @@ find_suitable_accept(Headers, Perferences) when is_list(Perferences), length(Per ), case lists:member(<<"*/*">>, Accepts) of true -> - {ok, lists:nth(1, Perferences)}; + {ok, lists:nth(1, Preferences)}; false -> - Found = lists:filter(fun(Accept) -> lists:member(Accept, Accepts) end, Perferences), + Found = lists:filter(fun(Accept) -> lists:member(Accept, Accepts) end, Preferences), case Found of - [] -> {error, no_suitalbe_accept}; + [] -> {error, no_suitable_accept}; _ -> {ok, lists:nth(1, Found)} end end. +%% To return a JSON formatted configuration file, which is used to be compatible with the already +%% implemented `GET /configs` in the old versions 5.0 and 5.1. +%% +%% In e5.1.1, we support to return a hocon configuration file by `get_configs_v2/1`. It's more +%% useful for the user to read or reload the configuration file via HTTP API. +%% +%% The `get_configs_v1/1` should be deprecated since 5.2.0. get_configs_v1(QueryStr) -> Node = maps:get(<<"node">>, QueryStr, node()), case lists:member(Node, emqx:running_nodes()) andalso - emqx_management_proto_v2:get_full_config(Node) + emqx_management_proto_v4:get_full_config(Node) of false -> Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])), @@ -389,10 +395,13 @@ get_configs_v1(QueryStr) -> end. get_configs_v2(QueryStr) -> + Node = maps:get(<<"node">>, QueryStr, node()), Conf = case maps:find(<<"key">>, QueryStr) of - error -> emqx_conf_cli:get_config(); - {ok, Key} -> emqx_conf_cli:get_config(atom_to_binary(Key)) + error -> + emqx_conf_proto_v3:get_hocon_config(Node); + {ok, Key} -> + emqx_conf_proto_v3:get_hocon_config(Node, atom_to_binary(Key)) end, { 200, diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 719d0913d..90fb1f98e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -515,7 +515,7 @@ list_listeners() -> lists:map(fun list_listeners/1, [Self | lists:delete(Self, emqx:running_nodes())]). list_listeners(Node) -> - wrap_rpc(emqx_management_proto_v2:list_listeners(Node)). + wrap_rpc(emqx_management_proto_v4:list_listeners(Node)). listener_status_by_id(NodeL) -> Listeners = maps:to_list(listener_status_by_id(NodeL, #{})), diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index e0685b2ff..9692441a6 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -110,17 +110,21 @@ broker(_) -> %% @doc Cluster with other nodes cluster(["join", SNode]) -> - case ekka:join(ekka_node:parse_name(SNode)) of + case mria:join(ekka_node:parse_name(SNode)) of ok -> emqx_ctl:print("Join the cluster successfully.~n"), - cluster(["status"]); + %% FIXME: running status on the replicant immediately + %% after join produces stale output + mria_rlog:role() =:= core andalso + cluster(["status"]), + ok; ignore -> emqx_ctl:print("Ignore.~n"); {error, Error} -> emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error]) end; cluster(["leave"]) -> - case ekka:leave() of + case mria:leave() of ok -> emqx_ctl:print("Leave the cluster successfully.~n"), cluster(["status"]); @@ -128,7 +132,7 @@ cluster(["leave"]) -> emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error]) end; cluster(["force-leave", SNode]) -> - case ekka:force_leave(ekka_node:parse_name(SNode)) of + case mria:force_leave(ekka_node:parse_name(SNode)) of ok -> emqx_ctl:print("Remove the node from cluster successfully.~n"), cluster(["status"]); @@ -138,9 +142,9 @@ cluster(["force-leave", SNode]) -> emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error]) end; cluster(["status"]) -> - emqx_ctl:print("Cluster status: ~p~n", [ekka_cluster:info()]); + emqx_ctl:print("Cluster status: ~p~n", [cluster_info()]); cluster(["status", "--json"]) -> - Info = sort_map_list_fields(ekka_cluster:info()), + Info = sort_map_list_fields(cluster_info()), emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Info)]); cluster(_) -> emqx_ctl:usage([ @@ -158,9 +162,7 @@ sort_map_list_fields(Map) when is_map(Map) -> end, Map, maps:keys(Map) - ); -sort_map_list_fields(NotMap) -> - NotMap. + ). sort_map_list_field(Field, Map) -> case maps:get(Field, Map) of @@ -925,3 +927,26 @@ with_log(Fun, Msg) -> {error, Reason} -> emqx_ctl:print("~s FAILED~n~p~n", [Msg, Reason]) end. + +cluster_info() -> + RunningNodes = safe_call_mria(running_nodes, [], []), + StoppedNodes = safe_call_mria(cluster_nodes, [stopped], []), + #{ + running_nodes => RunningNodes, + stopped_nodes => StoppedNodes + }. + +%% CLI starts before mria, so we should handle errors gracefully: +safe_call_mria(Fun, Args, OnFail) -> + try + apply(mria, Fun, Args) + catch + EC:Err:Stack -> + ?SLOG(warning, #{ + msg => "Call to mria failed", + call => {mria, Fun, Args}, + EC => Err, + stacktrace => Stack + }), + OnFail + end. diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index 43554c9ff..8ff3c1794 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -272,18 +272,22 @@ t_dashboard(_Config) -> t_configs_node({'init', Config}) -> Node = node(), meck:expect(emqx, running_nodes, fun() -> [Node, bad_node, other_node] end), - meck:expect( - emqx_management_proto_v2, - get_full_config, - fun - (Node0) when Node0 =:= Node -> <<"\"self\"">>; - (other_node) -> <<"\"other\"">>; - (bad_node) -> {badrpc, bad} - end - ), + F = fun + (Node0) when Node0 =:= Node -> <<"\"self\"">>; + (other_node) -> <<"\"other\"">>; + (bad_node) -> {badrpc, bad} + end, + F2 = fun + (Node0, _) when Node0 =:= Node -> <<"log=1">>; + (other_node, _) -> <<"log=2">>; + (bad_node, _) -> {badrpc, bad} + end, + meck:expect(emqx_management_proto_v4, get_full_config, F), + meck:expect(emqx_conf_proto_v3, get_hocon_config, F2), + meck:expect(hocon_pp, do, fun(Conf, _) -> Conf end), Config; t_configs_node({'end', _}) -> - meck:unload([emqx, emqx_management_proto_v2]); + meck:unload([emqx, emqx_management_proto_v4, emqx_conf_proto_v3, hocon_pp]); t_configs_node(_) -> Node = atom_to_list(node()), @@ -296,7 +300,10 @@ t_configs_node(_) -> {_, _, Body} = ExpRes, ?assertMatch(#{<<"code">> := <<"NOT_FOUND">>}, emqx_utils_json:decode(Body, [return_maps])), - ?assertMatch({error, {_, 500, _}}, get_configs_with_json("bad_node")). + ?assertMatch({error, {_, 500, _}}, get_configs_with_json("bad_node")), + + ?assertEqual({ok, #{<<"log">> => 1}}, get_configs_with_binary("log", Node)), + ?assertEqual({ok, #{<<"log">> => 2}}, get_configs_with_binary("log", "other_node")). %% v2 version binary t_configs_key(_Config) -> @@ -386,12 +393,16 @@ get_configs_with_json(Node, Opts) -> end. get_configs_with_binary(Key) -> + get_configs_with_binary(Key, atom_to_list(node())). + +get_configs_with_binary(Key, Node) -> + Path0 = "configs?node=" ++ Node, Path = case Key of - undefined -> ["configs"]; - _ -> ["configs?key=" ++ Key] + undefined -> Path0; + _ -> Path0 ++ "&key=" ++ Key end, - URI = emqx_mgmt_api_test_util:api_path(Path), + URI = emqx_mgmt_api_test_util:api_path([Path]), Auth = emqx_mgmt_api_test_util:auth_header_(), Headers = [{"accept", "text/plain"}, Auth], case emqx_mgmt_api_test_util:request_api(get, URI, [], Headers, [], #{return_all => true}) of diff --git a/apps/emqx_modules/src/emqx_modules_conf.erl b/apps/emqx_modules/src/emqx_modules_conf.erl index e5604280d..ebe9b83df 100644 --- a/apps/emqx_modules/src/emqx_modules_conf.erl +++ b/apps/emqx_modules/src/emqx_modules_conf.erl @@ -168,10 +168,11 @@ post_config_update(_, _UpdateReq, NewConfig, OldConfig, _AppEnvs) -> } = emqx_utils:diff_lists(NewConfig, OldConfig, fun(#{topic := T}) -> T end), Deregistered = [emqx_topic_metrics:deregister(T) || #{topic := T} <- Removed], Registered = [emqx_topic_metrics:register(T) || #{topic := T} <- Added], - Errs = [Res || Res <- Registered ++ Deregistered, Res =/= ok], - case Errs of + DeregisteredErrs = [Res || Res <- Deregistered, Res =/= ok, Res =/= {error, topic_not_found}], + RegisteredErrs = [Res || Res <- Registered, Res =/= ok, Res =/= {error, already_existed}], + case DeregisteredErrs ++ RegisteredErrs of [] -> ok; - _ -> {error, Errs} + Errs -> {error, Errs} end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl index fb27c0a30..430ad1e34 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_api.erl @@ -506,7 +506,7 @@ fields(local_status_enabled) -> )}, {"process", mk( - hoconsc:union([rebalance, evacuation]), + hoconsc:enum([rebalance, evacuation]), #{ desc => ?DESC(local_status_process), required => true diff --git a/apps/emqx_oracle/src/emqx_oracle.app.src b/apps/emqx_oracle/src/emqx_oracle.app.src index a5ca822e8..be3ed3276 100644 --- a/apps/emqx_oracle/src/emqx_oracle.app.src +++ b/apps/emqx_oracle/src/emqx_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_oracle, [ {description, "EMQX Enterprise Oracle Database Connector"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 5a7f8d752..b33122fb7 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -9,6 +9,10 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-define(UNHEALTHY_TARGET_MSG, + "Oracle table is invalid. Please check if the table exists in Oracle Database." +). + %%==================================================================== %% Exports %%==================================================================== @@ -239,7 +243,7 @@ on_get_status(_InstId, #{pool_name := Pool} = State) -> {connected, NState}; {error, {undefined_table, NState}} -> %% return new state indicating that we are connected but the target table is not created - {disconnected, NState, unhealthy_target}; + {disconnected, NState, {unhealthy_target, ?UNHEALTHY_TARGET_MSG}}; {error, _Reason} -> %% do not log error, it is logged in prepare_sql_to_conn connecting @@ -408,7 +412,19 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], TokensMap, Statements) whe Error end. -check_if_table_exists(Conn, SQL, Tokens) -> +check_if_table_exists(Conn, SQL, Tokens0) -> + % Discard nested tokens for checking if table exist. As payload here is defined as + % a single string, it would fail if Token is, for instance, ${payload.msg}, causing + % bridge probe to fail. + Tokens = lists:map( + fun + ({var, [Token | _DiscardedDeepTokens]}) -> + {var, [Token]}; + (Token) -> + Token + end, + Tokens0 + ), {Event, _Headers} = emqx_rule_events:eventmsg_publish( emqx_message:make(<<"t/opic">>, "test query") ), diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl index 322d3ab75..0db2ea3c6 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl @@ -364,11 +364,7 @@ cluster(Config) -> {load_schema, true}, {start_autocluster, true}, {schema_mod, emqx_enterprise_schema}, - %% need to restart schema registry app in the tests so - %% that it re-registers the config handler that is lost - %% when emqx_conf restarts during join. - {env, [{emqx_machine, applications, [emqx_schema_registry]}]}, - {load_apps, [emqx_machine | ?APPS]}, + {load_apps, [emqx_machine]}, {env_handler, fun (emqx) -> application:set_env(emqx, boot_modules, [broker, router]), @@ -388,6 +384,7 @@ start_cluster(Cluster) -> emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster ], + NumNodes = length(Nodes), on_exit(fun() -> emqx_utils:pmap( fun(N) -> @@ -397,7 +394,11 @@ start_cluster(Cluster) -> Nodes ) end), - erpc:multicall(Nodes, mria_rlog, wait_for_shards, [[?SCHEMA_REGISTRY_SHARD], 30_000]), + {ok, _} = snabbkaffe:block_until( + %% -1 because only those that join the first node will emit the event. + ?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}), + 30_000 + ), Nodes. wait_for_cluster_rpc(Node) -> @@ -658,7 +659,7 @@ t_cluster_serde_build(Config) -> Nodes = [N1, N2 | _] = start_cluster(Cluster), NumNodes = length(Nodes), wait_for_cluster_rpc(N2), - ?assertEqual( + ?assertMatch( ok, erpc:call(N2, emqx_schema_registry, add_schema, [SerdeName, Schema]) ), @@ -687,7 +688,7 @@ t_cluster_serde_build(Config) -> {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := schema_registry_serdes_deleted}), NumNodes, - 5_000 + 10_000 ), ?assertEqual( ok, diff --git a/changes/ce/fix-11294.en.md b/changes/ce/fix-11294.en.md new file mode 100644 index 000000000..c9ee1cdc5 --- /dev/null +++ b/changes/ce/fix-11294.en.md @@ -0,0 +1 @@ +Fix `emqx_ctl cluster join`, `leave`, and `status` commands. diff --git a/changes/ce/fix-11309.en.md b/changes/ce/fix-11309.en.md new file mode 100644 index 000000000..5f028be55 --- /dev/null +++ b/changes/ce/fix-11309.en.md @@ -0,0 +1,2 @@ +Improve startup order of EMQX applications. +Simplify build scripts and improve code reuse. diff --git a/changes/ce/fix-11322.en.md b/changes/ce/fix-11322.en.md new file mode 100644 index 000000000..fca72c01a --- /dev/null +++ b/changes/ce/fix-11322.en.md @@ -0,0 +1,4 @@ +Import additional configurations from EMQX backup file (`emqx ctl import` command): + - rule_engine (previously not imported due to the bug) + - topic_metrics (previously not implemented) + - slow_subs (previously not implemented). diff --git a/changes/ee/fix-11307.en.md b/changes/ee/fix-11307.en.md new file mode 100644 index 000000000..9b164f17d --- /dev/null +++ b/changes/ee/fix-11307.en.md @@ -0,0 +1 @@ +Fixed check for table existence to return a more friendly message in the Oracle bridge. diff --git a/changes/v5.1.2.en.md b/changes/v5.1.2.en.md new file mode 100644 index 000000000..a6b7d127b --- /dev/null +++ b/changes/v5.1.2.en.md @@ -0,0 +1,72 @@ +# v5.1.2 + +## Enhancements + +- [#11124](https://github.com/emqx/emqx/pull/11124) Release packages for Amazon Linux 2023 + +- [#11226](https://github.com/emqx/emqx/pull/11226) Unify the listener switch to `enable`, while being compatible with the previous `enabled`. + +- [#11249](https://github.com/emqx/emqx/pull/11249) Support HTTP API for setting alarm watermark of license. + +- [#11251](https://github.com/emqx/emqx/pull/11251) Add `/cluster/topology` HTTP API endpoint + + `GET` request to the endpoint returns the cluster topology: connections between RLOG core and replicant nodes. + +- [#11253](https://github.com/emqx/emqx/pull/11253) The Webhook/HTTP bridge has been refactored to its own Erlang application. This allows for more flexibility in the future, and also allows for the bridge to be run as a standalone application. + +- [#11289](https://github.com/emqx/emqx/pull/11289) Release packages for Debian 12. + +- [#11290](https://github.com/emqx/emqx/pull/11290) Updated `jq` dependency to version 0.3.10 which includes `oniguruma` library update to version 6.9.8 with few minor security fixes. + +- [#11291](https://github.com/emqx/emqx/pull/11291) Updated RocksDB version to 1.8.0-emqx-1 via ekka update to 0.15.6. + +- [#11236](https://github.com/emqx/emqx/pull/11236) Improve the speed of clients querying in HTTP API `/clients` endpoint with default parameters + +## Bug Fixes + +- [#11065](https://github.com/emqx/emqx/pull/11065) Avoid logging irrelevant error messages during EMQX shutdown. + +- [#11077](https://github.com/emqx/emqx/pull/11077) Fixes crash when updating binding with a non-integer port. + +- [#11184](https://github.com/emqx/emqx/pull/11184) Config value for `max_packet_size` has a max value of 256MB defined by protocol. This is now enforced and any configuration with a value greater than that will break. + +- [#11192](https://github.com/emqx/emqx/pull/11192) Fix produces valid HOCON file when atom type is used. + Remove unnecessary `"` from HOCON file. + +- [#11195](https://github.com/emqx/emqx/pull/11195) Avoid to create duplicated subscription by HTTP API or client in Stomp gateway + +- [#11206](https://github.com/emqx/emqx/pull/11206) Make the username and password params of CoAP client to optional in connection mode. + +- [#11208](https://github.com/emqx/emqx/pull/11208) Fix the issue of abnormal data statistics for LwM2M client. + +- [#11211](https://github.com/emqx/emqx/pull/11211) Consistently return `404` for `DELETE` operations on non-existent resources. + +- [#11214](https://github.com/emqx/emqx/pull/11214) Fix a bug where node configuration may fail to synchronize correctly when joining the cluster. + +- [#11229](https://github.com/emqx/emqx/pull/11229) Fixed an issue preventing plugins from starting/stopping after changing configuration via `emqx ctl conf load`. + +- [#11237](https://github.com/emqx/emqx/pull/11237) The `headers` default value in /prometheus API should be a map instead of a list. + +- [#11250](https://github.com/emqx/emqx/pull/11250) Fix while a WebSocket packet contains more than one MQTT packet, the order of MQTT packets will be reversed. + + +- [#11271](https://github.com/emqx/emqx/pull/11271) Ensure that the range of percentage type is from 0% to 100%. + +- [#11272](https://github.com/emqx/emqx/pull/11272) Fix a typo in the log, when EMQX received an abnormal `PUBREL` packet, the `pubrel` was mistakenly typo as `pubrec`. + +- [#11281](https://github.com/emqx/emqx/pull/11281) Restored support for the special `$queue/` shared subscription. + +- [#11294](https://github.com/emqx/emqx/pull/11294) Fix `emqx_ctl cluster join`, `leave`, and `status` commands. + +- [#11296](https://github.com/emqx/emqx/pull/11296) Import additional configurations from EMQX backup file (`emqx ctl import` command): + - rule_engine (previously not imported due to the bug) + - topic_metrics (previously not implemented) + - slow_subs (previously not implemented). + +- [#11309](https://github.com/emqx/emqx/pull/11309) Improve startup order of EMQX applications. + Simplify build scripts and improve code reuse. + +- [#11322](https://github.com/emqx/emqx/pull/11322) Import additional configurations from EMQX backup file (`emqx ctl import` command): + - rule_engine (previously not imported due to the bug) + - topic_metrics (previously not implemented) + - slow_subs (previously not implemented). diff --git a/deploy/charts/emqx/Chart.yaml b/deploy/charts/emqx/Chart.yaml index a2262da8b..67166708b 100644 --- a/deploy/charts/emqx/Chart.yaml +++ b/deploy/charts/emqx/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.1.1 +version: 5.1.2 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.1.1 +appVersion: 5.1.2 diff --git a/mix.exs b/mix.exs index f14fd1a91..cf29a9c3e 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.9.6", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-1", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.15.6", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.15.7", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.11", override: true}, @@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true}, - {:hocon, github: "emqx/hocon", tag: "0.39.13", override: true}, + {:hocon, github: "emqx/hocon", tag: "0.39.14", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, @@ -298,6 +298,7 @@ defmodule EMQXUmbrella.MixProject do [ applications: applications(edition_type), skip_mode_validation_for: [ + :emqx_mix, :emqx_gateway, :emqx_gateway_stomp, :emqx_gateway_mqttsn, @@ -317,7 +318,10 @@ defmodule EMQXUmbrella.MixProject do :emqx_auto_subscribe, :emqx_slow_subs, :emqx_plugins, - :emqx_ft + :emqx_ft, + :emqx_s3, + :emqx_durable_storage, + :rabbit_common ], steps: steps, strip_beams: false @@ -327,113 +331,54 @@ defmodule EMQXUmbrella.MixProject do end def applications(edition_type) do - [ - crypto: :permanent, - public_key: :permanent, - asn1: :permanent, - syntax_tools: :permanent, - ssl: :permanent, - os_mon: :permanent, - inets: :permanent, - compiler: :permanent, - runtime_tools: :permanent, - redbug: :permanent, - xmerl: :permanent, - hocon: :load, - telemetry: :permanent, - emqx: :load, - emqx_conf: :load, - emqx_machine: :permanent - ] ++ - if(enable_rocksdb?(), - do: [mnesia_rocksdb: :load], - else: [] - ) ++ - [ - mnesia: :load, - ekka: :load, - esasl: :load, - observer_cli: :permanent, - tools: :permanent, - covertool: :load, - system_monitor: :load, - emqx_utils: :load, - emqx_http_lib: :permanent, - emqx_resource: :permanent, - emqx_connector: :permanent, - emqx_authn: :permanent, - emqx_authz: :permanent, - emqx_auto_subscribe: :permanent, - emqx_gateway: :permanent, - emqx_gateway_stomp: :permanent, - emqx_gateway_mqttsn: :permanent, - emqx_gateway_coap: :permanent, - emqx_gateway_lwm2m: :permanent, - emqx_gateway_exproto: :permanent, - emqx_exhook: :permanent, - emqx_bridge: :permanent, - emqx_bridge_mqtt: :permanent, - emqx_bridge_http: :permanent, - emqx_rule_engine: :permanent, - emqx_modules: :permanent, - emqx_management: :permanent, - emqx_dashboard: :permanent, - emqx_retainer: :permanent, - emqx_prometheus: :permanent, - emqx_psk: :permanent, - emqx_slow_subs: :permanent, - emqx_mongodb: :permanent, - emqx_redis: :permanent, - emqx_mysql: :permanent, - emqx_plugins: :permanent, - emqx_mix: :none - ] ++ - if(enable_quicer?(), do: [quicer: :permanent], else: []) ++ - if(enable_bcrypt?(), do: [bcrypt: :permanent], else: []) ++ - if(enable_jq?(), do: [jq: :load], else: []) ++ - if(is_app(:observer), - do: [observer: :load], - else: [] - ) ++ - if(edition_type == :enterprise, - do: [ - emqx_license: :permanent, - emqx_enterprise: :load, - emqx_bridge_kafka: :permanent, - emqx_bridge_pulsar: :permanent, - emqx_bridge_gcp_pubsub: :permanent, - emqx_bridge_cassandra: :permanent, - emqx_bridge_opents: :permanent, - emqx_bridge_clickhouse: :permanent, - emqx_bridge_dynamo: :permanent, - emqx_bridge_hstreamdb: :permanent, - emqx_bridge_influxdb: :permanent, - emqx_bridge_iotdb: :permanent, - emqx_bridge_matrix: :permanent, - emqx_bridge_mongodb: :permanent, - emqx_bridge_mysql: :permanent, - emqx_bridge_pgsql: :permanent, - emqx_bridge_redis: :permanent, - emqx_bridge_rocketmq: :permanent, - emqx_bridge_tdengine: :permanent, - emqx_bridge_timescale: :permanent, - emqx_bridge_sqlserver: :permanent, - emqx_oracle: :permanent, - emqx_bridge_oracle: :permanent, - emqx_bridge_rabbitmq: :permanent, - emqx_schema_registry: :permanent, - emqx_eviction_agent: :permanent, - emqx_node_rebalance: :permanent, - emqx_ft: :permanent, - emqx_bridge_kinesis: :permanent - ], - else: [ - emqx_telemetry: :permanent - ] - ) + {:ok, + [ + %{ + db_apps: db_apps, + system_apps: system_apps, + common_business_apps: common_business_apps, + ee_business_apps: ee_business_apps, + ce_business_apps: ce_business_apps + } + ]} = :file.consult("apps/emqx_machine/priv/reboot_lists.eterm") + + edition_specific_apps = + if edition_type == :enterprise do + ee_business_apps + else + ce_business_apps + end + + business_apps = common_business_apps ++ edition_specific_apps + + excluded_apps = excluded_apps() + + system_apps = + Enum.map(system_apps, fn app -> + if is_atom(app), do: {app, :permanent}, else: app + end) + + db_apps = Enum.map(db_apps, &{&1, :load}) + business_apps = Enum.map(business_apps, &{&1, :load}) + + [system_apps, db_apps, [emqx_machine: :permanent], business_apps] + |> List.flatten() + |> Keyword.reject(fn {app, _type} -> app in excluded_apps end) end - defp is_app(name) do + defp excluded_apps() do + %{ + mnesia_rocksdb: enable_rocksdb?(), + quicer: enable_quicer?(), + bcrypt: enable_bcrypt?(), + jq: enable_jq?(), + observer: is_app?(:observer) + } + |> Enum.reject(&elem(&1, 1)) + |> Enum.map(&elem(&1, 0)) + end + + defp is_app?(name) do case Application.load(name) do :ok -> true diff --git a/rebar.config b/rebar.config index ea13c6caf..08d817771 100644 --- a/rebar.config +++ b/rebar.config @@ -62,7 +62,7 @@ , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-1"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.6"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.7"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.11"}}} @@ -75,7 +75,7 @@ , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.13"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} diff --git a/rebar.config.erl b/rebar.config.erl index 7bd0f0548..21feada3d 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -387,74 +387,37 @@ overlay_vars_pkg(pkg) -> ]. relx_apps(ReleaseType, Edition) -> - [ - kernel, - sasl, - crypto, - public_key, - asn1, - syntax_tools, - ssl, - os_mon, - inets, - compiler, - runtime_tools, - redbug, - xmerl, - {hocon, load}, - telemetry, - % started by emqx_machine - {emqx, load}, - {emqx_conf, load}, - emqx_machine - ] ++ - [{mnesia_rocksdb, load} || is_rocksdb_supported()] ++ - [ - {mnesia, load}, - {ekka, load}, - {esasl, load}, - observer_cli, - tools, - {covertool, load}, - % started by emqx_machine - {system_monitor, load}, - {emqx_utils, load}, - emqx_http_lib, - emqx_resource, - emqx_connector, - emqx_authn, - emqx_authz, - emqx_auto_subscribe, - emqx_gateway, - emqx_gateway_stomp, - emqx_gateway_mqttsn, - emqx_gateway_coap, - emqx_gateway_lwm2m, - emqx_gateway_exproto, - emqx_exhook, - emqx_bridge, - emqx_bridge_mqtt, - emqx_bridge_http, - emqx_rule_engine, - emqx_modules, - emqx_management, - emqx_dashboard, - emqx_retainer, - emqx_prometheus, - emqx_psk, - emqx_slow_subs, - emqx_mongodb, - emqx_redis, - emqx_mysql, - emqx_plugins - ] ++ - [quicer || is_quicer_supported()] ++ - [bcrypt || provide_bcrypt_release(ReleaseType)] ++ - %% Started automatically when needed (only needs to be started when the - %% port implementation is used) - [{jq, load} || is_jq_supported()] ++ - [{observer, load} || is_app(observer)] ++ - relx_apps_per_edition(Edition). + {ok, [ + #{ + db_apps := DBApps, + system_apps := SystemApps, + common_business_apps := CommonBusinessApps, + ee_business_apps := EEBusinessApps, + ce_business_apps := CEBusinessApps + } + ]} = file:consult("apps/emqx_machine/priv/reboot_lists.eterm"), + EditionSpecificApps = + case Edition of + ee -> EEBusinessApps; + ce -> CEBusinessApps + end, + BusinessApps = CommonBusinessApps ++ EditionSpecificApps, + ExcludedApps = excluded_apps(ReleaseType), + SystemApps ++ + %% EMQX starts the DB and the business applications: + [{App, load} || App <- (DBApps -- ExcludedApps)] ++ + [emqx_machine] ++ + [{App, load} || App <- (BusinessApps -- ExcludedApps)]. + +excluded_apps(ReleaseType) -> + OptionalApps = [ + {quicer, is_quicer_supported()}, + {bcrypt, provide_bcrypt_release(ReleaseType)}, + {jq, is_jq_supported()}, + {observer, is_app(observer)}, + {mnesia_rocksdb, is_rocksdb_supported()} + ], + [App || {App, false} <- OptionalApps]. is_app(Name) -> case application:load(Name) of @@ -463,41 +426,6 @@ is_app(Name) -> _ -> false end. -relx_apps_per_edition(ee) -> - [ - emqx_license, - {emqx_enterprise, load}, - emqx_bridge_kafka, - emqx_bridge_pulsar, - emqx_bridge_gcp_pubsub, - emqx_bridge_cassandra, - emqx_bridge_opents, - emqx_bridge_clickhouse, - emqx_bridge_dynamo, - emqx_bridge_hstreamdb, - emqx_bridge_influxdb, - emqx_bridge_iotdb, - emqx_bridge_matrix, - emqx_bridge_mongodb, - emqx_bridge_mysql, - emqx_bridge_pgsql, - emqx_bridge_redis, - emqx_bridge_rocketmq, - emqx_bridge_tdengine, - emqx_bridge_timescale, - emqx_bridge_sqlserver, - emqx_oracle, - emqx_bridge_oracle, - emqx_bridge_rabbitmq, - emqx_schema_registry, - emqx_eviction_agent, - emqx_node_rebalance, - emqx_ft, - emqx_bridge_kinesis - ]; -relx_apps_per_edition(ce) -> - [emqx_telemetry]. - relx_overlay(ReleaseType, Edition) -> [ {mkdir, "log/"}, diff --git a/rel/i18n/emqx_mgmt_api_configs.hocon b/rel/i18n/emqx_mgmt_api_configs.hocon index 47852349e..44d461081 100644 --- a/rel/i18n/emqx_mgmt_api_configs.hocon +++ b/rel/i18n/emqx_mgmt_api_configs.hocon @@ -23,14 +23,14 @@ rest_conf_query.label: """Reset the config entry with query""" get_global_zone_configs.desc: -"""Get the global zone configs""" +"""Get the MQTT-related configuration""" get_global_zone_configs.label: -"""Get the global zone configs""" +"""Get the MQTT-related configuration""" update_global_zone_configs.desc: -"""Update global zone configs""" +"""Update MQTT-related configuration""" update_global_zone_configs.label: -"""Update global zone configs""" +"""Update MQTT-related configuration""" get_node_level_limiter_configs.desc: """Get the node-level limiter configs""" diff --git a/rel/i18n/emqx_mgmt_api_key_schema.hocon b/rel/i18n/emqx_mgmt_api_key_schema.hocon index 0dc11c7ac..c217dc2db 100644 --- a/rel/i18n/emqx_mgmt_api_key_schema.hocon +++ b/rel/i18n/emqx_mgmt_api_key_schema.hocon @@ -7,12 +7,11 @@ api_key.label: """API Key""" bootstrap_file.desc: -"""Bootstrap file is used to add an api_key when emqx is launched, - the format is: - ``` - 7e729ae70d23144b:2QILI9AcQ9BYlVqLDHQNWN2saIjBV4egr1CZneTNKr9CpK - ec3907f865805db0:Ee3taYltUKtoBVD9C3XjQl9C6NXheip8Z9B69BpUv5JxVHL - ```""" +"""The bootstrap file provides API keys for EMQX. +EMQX will load these keys on startup to authorize API requests. +It contains key-value pairs in the format:`api_key:api_secret`. +Each line specifies an API key and its associated secret. +""" bootstrap_file.label: """Initialize api_key file."""