diff --git a/Makefile b/Makefile index 2cabaebcf..3d582fafd 100644 --- a/Makefile +++ b/Makefile @@ -222,6 +222,10 @@ run: $(PROFILE) quickrun quickrun: ./_build/$(PROFILE)/rel/emqx/bin/emqx console +## Take the currently set PROFILE +docker: + @$(BUILD) $(PROFILE) docker + ## docker target is to create docker instructions .PHONY: $(REL_PROFILES:%=%-docker) $(REL_PROFILES:%=%-elixir-docker) define gen-docker-target diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index e4a46743b..798153a57 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -27,9 +27,9 @@ {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.1"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.2"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, - {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.3"}}}, + {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.36.0"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}} diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 7000ffe0a..bb4520aa9 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -872,12 +872,6 @@ fields("mqtt_quic_listener") -> ?MAX_UINT(64), ?DESC(fields_mqtt_quic_listener_max_bytes_per_key) )}, - {"handshake_idle_timeout_ms", - quic_lowlevel_settings_uint( - 1, - ?MAX_UINT(64), - ?DESC(fields_mqtt_quic_listener_handshake_idle_timeout) - )}, {"tls_server_max_send_buffer", quic_lowlevel_settings_uint( 1, diff --git a/apps/emqx/test/emqx_broker_helper_SUITE.erl b/apps/emqx/test/emqx_broker_helper_SUITE.erl index 7c3e1aa91..65603410a 100644 --- a/apps/emqx/test/emqx_broker_helper_SUITE.erl +++ b/apps/emqx/test/emqx_broker_helper_SUITE.erl @@ -55,11 +55,12 @@ t_register_sub(_) -> ?assertEqual(self(), emqx_broker_helper:lookup_subpid(<<"clientid">>)). t_shard_seq(_) -> - ?assertEqual([], ets:lookup(emqx_subseq, <<"topic">>)), - emqx_broker_helper:create_seq(<<"topic">>), - ?assertEqual([{<<"topic">>, 1}], ets:lookup(emqx_subseq, <<"topic">>)), - emqx_broker_helper:reclaim_seq(<<"topic">>), - ?assertEqual([], ets:lookup(emqx_subseq, <<"topic">>)). + TestTopic = atom_to_list(?FUNCTION_NAME), + ?assertEqual([], ets:lookup(emqx_subseq, TestTopic)), + emqx_broker_helper:create_seq(TestTopic), + ?assertEqual([{TestTopic, 1}], ets:lookup(emqx_subseq, TestTopic)), + emqx_broker_helper:reclaim_seq(TestTopic), + ?assertEqual([], ets:lookup(emqx_subseq, TestTopic)). t_shards_num(_) -> ?assertEqual(emqx_vm:schedulers() * 32, emqx_broker_helper:shards_num()). diff --git a/apps/emqx/test/emqx_olp_SUITE.erl b/apps/emqx/test/emqx_olp_SUITE.erl index b1cfbc0f1..87393686e 100644 --- a/apps/emqx/test/emqx_olp_SUITE.erl +++ b/apps/emqx/test/emqx_olp_SUITE.erl @@ -22,18 +22,23 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("lc/include/lc.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> emqx_common_test_helpers:start_apps([]), - Config. + OldSch = erlang:system_flag(schedulers_online, 1), + [{old_sch, OldSch} | Config]. -end_per_suite(_Config) -> +end_per_suite(Config) -> + erlang:system_flag(schedulers_online, ?config(old_sch, Config)), emqx_common_test_helpers:stop_apps([]). init_per_testcase(_, Config) -> + emqx_common_test_helpers:boot_modules(all), + emqx_common_test_helpers:start_apps([]), emqx_olp:enable(), case wait_for(fun() -> lc_sup:whereis_runq_flagman() end, 10) of true -> ok; @@ -86,6 +91,7 @@ t_overload_cooldown_conn(Config) -> t_overloaded_conn(Config), timer:sleep(1000), ?assert(not emqx_olp:is_overloaded()), + true = emqx:is_running(node()), {ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]), ?assertMatch({ok, _Pid}, emqtt:connect(C)), emqtt:stop(C). @@ -93,7 +99,7 @@ t_overload_cooldown_conn(Config) -> -spec burst_runq() -> ParentToKill :: pid(). burst_runq() -> NProc = erlang:system_info(schedulers_online), - spawn(?MODULE, worker_parent, [NProc * 10, {?MODULE, busy_loop, []}]). + spawn(?MODULE, worker_parent, [NProc * 1000, {?MODULE, busy_loop, []}]). %% internal helpers worker_parent(N, {M, F, A}) -> diff --git a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl index a95597f07..a1b7db12f 100644 --- a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl +++ b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl @@ -541,7 +541,6 @@ t_multi_streams_packet_boundary(Config) -> [{qos, PubQos}], undefined ), - timer:sleep(300), PubRecvs = recv_pub(3, [], 1000), ?assertMatch( [ @@ -668,12 +667,10 @@ t_multi_streams_packet_malform(Config) -> {error, stm_send_error, aborted} -> ok end, - timer:sleep(200), ?assert(is_list(emqtt:info(C))), {error, stm_send_error, aborted} = quicer:send(MalformStream, <<1, 2, 3, 4, 5, 6, 7, 8, 9, 0>>), - timer:sleep(200), ?assert(is_list(emqtt:info(C))), ok = emqtt:disconnect(C). @@ -743,7 +740,6 @@ t_multi_streams_packet_too_large(Config) -> [{qos, PubQos}], undefined ), - timer:sleep(200), ?assert(is_list(emqtt:info(C))), timeout = recv_pub(1), @@ -757,7 +753,6 @@ t_multi_streams_packet_too_large(Config) -> [{qos, PubQos}], undefined ), - timer:sleep(200), timeout = recv_pub(1), ?assert(is_list(emqtt:info(C))), @@ -783,7 +778,6 @@ t_multi_streams_packet_too_large(Config) -> topic := Topic }} ] = recv_pub(1), - timer:sleep(200), ?assert(is_list(emqtt:info(C))), @@ -1409,7 +1403,7 @@ t_multi_streams_shutdown_ctrl_stream_then_reconnect(Config) -> {quic, _Conn, Ctrlstream} = proplists:get_value(socket, emqtt:info(C)), quicer:shutdown_stream(Ctrlstream, ?config(stream_shutdown_flag, Config), 500, 100), timer:sleep(200), - %% Client should be closed + %% Client should not be closed ?assert(is_list(emqtt:info(C))). t_multi_streams_emqx_ctrl_kill(Config) -> @@ -1462,9 +1456,8 @@ t_multi_streams_emqx_ctrl_kill(Config) -> [{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId), exit(TransPid, kill), - timer:sleep(200), %% Client should be closed - ?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)). + assert_client_die(C). t_multi_streams_emqx_ctrl_exit_normal(Config) -> erlang:process_flag(trap_exit, true), @@ -1516,9 +1509,8 @@ t_multi_streams_emqx_ctrl_exit_normal(Config) -> [{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId), emqx_connection:stop(TransPid), - timer:sleep(200), %% Client exit normal. - ?assertMatch({'EXIT', {normal, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)). + assert_client_die(C). t_multi_streams_remote_shutdown(Config) -> erlang:process_flag(trap_exit, true), @@ -1570,9 +1562,8 @@ t_multi_streams_remote_shutdown(Config) -> ok = stop_emqx(), start_emqx_quic(?config(port, Config)), - timer:sleep(200), %% Client should be closed - ?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)). + assert_client_die(C). t_multi_streams_remote_shutdown_with_reconnect(Config) -> erlang:process_flag(trap_exit, true), @@ -1973,6 +1964,9 @@ test_dir(Config) -> recv_pub(Count) -> recv_pub(Count, [], 100). +recv_pub(Count, Tout) -> + recv_pub(Count, [], Tout). + recv_pub(0, Acc, _Tout) -> lists:reverse(Acc); recv_pub(Count, Acc, Tout) -> @@ -2036,6 +2030,19 @@ select_port() -> via_stream({quic, _Conn, Stream}) -> Stream. +assert_client_die(C) -> + assert_client_die(C, 100, 10). +assert_client_die(C, _, 0) -> + ct:fail("Client ~p did not die", [C]); +assert_client_die(C, Delay, Retries) -> + case catch emqtt:info(C) of + {'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}} -> + ok; + _Other -> + timer:sleep(Delay), + assert_client_die(C, Delay, Retries - 1) + end. + %% BUILD_WITHOUT_QUIC -else. -endif. diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index d575f09bc..901f25455 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -35,8 +35,18 @@ emqx_bridge_schema { desc_status { desc { - en: """The status of the bridge""" - zh: """Bridge 的状态""" + en: """The status of the bridge
+- connecting: the initial state before any health probes were made.
+- connected: when the bridge passes the health probes.
+- disconnected: when the bridge can not pass health probes.
+- stopped: when the bridge resource is requested to be stopped.
+- inconsistent: When not all the nodes are at the same status.""" + zh: """Bridge 的连接状态
+- connecting: 启动时的初始状态。
+- connected: 桥接驱动健康检查正常。
+- disconnected: 当桥接无法通过健康检查。
+- stopped: 桥接处于停用状态。
+- inconsistent: 集群中有各节点汇报的状态不一致。""" } label: { en: "Bridge Status" @@ -46,8 +56,16 @@ emqx_bridge_schema { desc_node_status { desc { - en: """The status of the bridge for each node""" - zh: """每个节点的 Bridge 状态""" + en: """The status of the bridge for each node. +- connecting: the initial state before any health probes were made.
+- connected: when the bridge passes the health probes.
+- disconnected: when the bridge can not pass health probes.
+- stopped: when the bridge resource is requested to be stopped.""" + zh: """每个节点的 Bridge 状态 +- connecting: 启动时的初始状态。
+- connected: 桥接驱动健康检查正常。
+- disconnected: 当桥接无法通过健康检查。
+- stopped: 桥接处于停用状态。""" } label: { en: "Node Bridge Status" diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index a6c2cee27..293692ccd 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -743,40 +743,39 @@ collect_metrics(Bridges) -> aggregate_metrics(AllMetrics) -> InitMetrics = ?EMPTY_METRICS, - lists:foldl( - fun( - #{ - metrics := ?metrics( - M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17 - ) - }, - ?metrics( - N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17 - ) - ) -> - ?METRICS( - M1 + N1, - M2 + N2, - M3 + N3, - M4 + N4, - M5 + N5, - M6 + N6, - M7 + N7, - M8 + N8, - M9 + N9, - M10 + N10, - M11 + N11, - M12 + N12, - M13 + N13, - M14 + N14, - M15 + N15, - M16 + N16, - M17 + N17 - ) - end, - InitMetrics, - AllMetrics - ). + lists:foldl(fun aggregate_metrics/2, InitMetrics, AllMetrics). + +aggregate_metrics( + #{ + metrics := ?metrics( + M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17 + ) + }, + ?metrics( + N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17 + ) +) -> + ?METRICS( + M1 + N1, + M2 + N2, + M3 + N3, + M4 + N4, + M5 + N5, + M6 + N6, + M7 + N7, + M8 + N8, + M9 + N9, + M10 + N10, + M11 + N11, + M12 + N12, + M13 + N13, + M14 + N14, + M15 + N15, + M16 + N16, + M17 + N17 + ); +aggregate_metrics(#{}, Metrics) -> + Metrics. format_resp(Data) -> format_resp(Data, node()). @@ -786,18 +785,26 @@ format_resp( type := Type, name := BridgeName, raw_config := RawConf, - resource_data := #{status := Status, metrics := Metrics} + resource_data := ResourceData }, Node ) -> RawConfFull = fill_defaults(Type, RawConf), - redact(RawConfFull#{ - type => Type, - name => maps:get(<<"name">>, RawConf, BridgeName), - node => Node, - status => Status, - metrics => format_metrics(Metrics) - }). + redact( + maps:merge( + RawConfFull#{ + type => Type, + name => maps:get(<<"name">>, RawConf, BridgeName), + node => Node + }, + format_resource_data(ResourceData) + ) + ). + +format_resource_data(#{status := Status, metrics := Metrics}) -> + #{status => Status, metrics => format_metrics(Metrics)}; +format_resource_data(#{status := Status}) -> + #{status => Status}. format_metrics(#{ counters := #{ diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index c490294eb..ed2baec8f 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -208,7 +208,7 @@ desc(_) -> undefined. status() -> - hoconsc:enum([connected, disconnected, connecting]). + hoconsc:enum([connected, disconnected, connecting, inconsistent]). node_name() -> {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index f81ecb76f..5f863ed63 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -140,17 +140,13 @@ stop_http_server(Sock, Acceptor) -> gen_tcp:close(Sock). listen_on_random_port() -> - Min = 1024, - Max = 65000, - rand:seed(exsplus, erlang:timestamp()), - Port = rand:uniform(Max - Min) + Min, - case - gen_tcp:listen(Port, [ - binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000} - ]) - of - {ok, Sock} -> {Port, Sock}; - {error, eaddrinuse} -> listen_on_random_port() + SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], + case gen_tcp:listen(0, SockOpts) of + {ok, Sock} -> + {ok, Port} = inet:port(Sock), + {Port, Sock}; + {error, Reason} when Reason /= eaddrinuse -> + {error, Reason} end. accept_loop(Sock, HandleFun, Parent) -> @@ -543,7 +539,9 @@ do_start_stop_bridges(Type, Config) -> {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% Create broken bridge - BadServer = <<"nohost">>, + {ListenPort, Sock} = listen_on_random_port(), + %% Connecting to this endpoint should always timeout + BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])), BadName = <<"bad_", (atom_to_binary(Type))/binary>>, {ok, 201, BadBridge1} = request( post, @@ -555,11 +553,15 @@ do_start_stop_bridges(Type, Config) -> <<"name">> := BadName, <<"enable">> := true, <<"server">> := BadServer, - <<"status">> := <<"disconnected">>, + <<"status">> := <<"connecting">>, <<"node_status">> := [_ | _] } = jsx:decode(BadBridge1), BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName), - {ok, 500, _} = request(post, operation_path(Type, start, BadBridgeID), <<"">>), + ?assertMatch( + {ok, SC, _} when SC == 500 orelse SC == 503, + request(post, operation_path(Type, start, BadBridgeID), <<"">>) + ), + ok = gen_tcp:close(Sock), ok. t_enable_disable_bridges(Config) -> diff --git a/bin/emqx b/bin/emqx index f0366c09d..b2bdded86 100755 --- a/bin/emqx +++ b/bin/emqx @@ -525,11 +525,11 @@ else ## only one emqx node is running, get running args from 'ps -ef' output tmp_nodename=$(echo -e "$PS_LINE" | $GREP -oE "\s\-s?name.*" | awk '{print $2}' || true) tmp_cookie=$(echo -e "$PS_LINE" | $GREP -oE "\s\-setcookie.*" | awk '{print $2}' || true) - tmp_dist="$(echo -e "$PS_LINE" | $GREP -oE '\-ssl_dist_optfile\s.+\s' | awk '{print $2}' || true)" + SSL_DIST_OPTFILE="$(echo -e "$PS_LINE" | $GREP -oE '\-ssl_dist_optfile\s.+\s' | awk '{print $2}' || true)" tmp_ticktime="$(echo -e "$PS_LINE" | $GREP -oE '\s\-kernel\snet_ticktime\s.+\s' | awk '{print $3}' || true)" # data_dir is actually not needed, but kept anyway tmp_datadir="$(echo -e "$PS_LINE" | $GREP -oE "\-emqx_data_dir.*" | sed -E 's#.+emqx_data_dir[[:blank:]]##g' | sed -E 's#[[:blank:]]--$##g' || true)" - if [ -z "$tmp_dist" ]; then + if [ -z "$SSL_DIST_OPTFILE" ]; then tmp_proto='inet_tcp' else tmp_proto='inet_tls' @@ -945,7 +945,7 @@ if [ -n "${EMQX_NODE_COOKIE:-}" ]; then fi COOKIE="${EMQX_NODE__COOKIE:-}" COOKIE_IN_USE="$(get_boot_config 'node.cookie')" -if [ -n "$COOKIE_IN_USE" ] && [ -n "$COOKIE" ] && [ "$COOKIE" != "$COOKIE_IN_USE" ]; then +if [ "$IS_BOOT_COMMAND" != 'yes' ] && [ -n "$COOKIE_IN_USE" ] && [ -n "$COOKIE" ] && [ "$COOKIE" != "$COOKIE_IN_USE" ]; then die "EMQX_NODE__COOKIE is different from the cookie used by $NAME" fi [ -z "$COOKIE" ] && COOKIE="$COOKIE_IN_USE" diff --git a/changes/ce/feat-10019.zh.md b/changes/ce/feat-10019.zh.md index 9ef671b3d..b0eb2a673 100644 --- a/changes/ce/feat-10019.zh.md +++ b/changes/ce/feat-10019.zh.md @@ -1 +1 @@ -为 QUIC 侦听器添加更多底层调优选项。 +为 QUIC 监听器添加更多底层调优选项。 diff --git a/changes/ce/fix-10032.en.md b/changes/ce/fix-10032.en.md new file mode 100644 index 000000000..1db8f9836 --- /dev/null +++ b/changes/ce/fix-10032.en.md @@ -0,0 +1 @@ +When the resource manager is busy trying to establish a connection with the remote, the resource might yet lack any metrics information. Prior to this fix, the `bridges/` API handler crashed in such circumstances. diff --git a/changes/ce/fix-10032.zh.md b/changes/ce/fix-10032.zh.md new file mode 100644 index 000000000..f582066ac --- /dev/null +++ b/changes/ce/fix-10032.zh.md @@ -0,0 +1 @@ +当资源管理器忙于尝试与远程建立连接时,资源可能还缺少任何度量信息。 在此修复之前,`bridges/' API 处理程序在这种情况下崩溃。 diff --git a/changes/ce/fix-10037.en.md b/changes/ce/fix-10037.en.md new file mode 100644 index 000000000..73c92d69d --- /dev/null +++ b/changes/ce/fix-10037.en.md @@ -0,0 +1,2 @@ +Fix Swagger API doc rendering crash. +In version 5.0.18, a bug was introduced that resulted in duplicated field names in the configuration schema. This, in turn, caused the Swagger schema generated to become invalid. diff --git a/changes/ce/fix-10037.zh.md b/changes/ce/fix-10037.zh.md new file mode 100644 index 000000000..5bd447c1f --- /dev/null +++ b/changes/ce/fix-10037.zh.md @@ -0,0 +1,2 @@ +修复 Swagger API 文档渲染崩溃。 +在版本 5.0.18 中,引入了一个错误,导致配置 schema 中出现了重复的配置名称,进而导致生成了无效的 Swagger spec。 diff --git a/changes/ce/fix-10041.en.md b/changes/ce/fix-10041.en.md new file mode 100644 index 000000000..c1aff24c2 --- /dev/null +++ b/changes/ce/fix-10041.en.md @@ -0,0 +1,2 @@ +For influxdb bridge, added integer value placeholder annotation hint to `write_syntax` documentation. +Also supported setting a constant value for the `timestamp` field. diff --git a/changes/ce/fix-10041.zh.md b/changes/ce/fix-10041.zh.md new file mode 100644 index 000000000..d197ea81f --- /dev/null +++ b/changes/ce/fix-10041.zh.md @@ -0,0 +1,2 @@ +为 influxdb 桥接的配置项 `write_syntax` 描述文档增加了类型标识符的提醒。 +另外在配置中支持 `timestamp` 使用一个常量。 diff --git a/changes/ce/fix-10042.en.md b/changes/ce/fix-10042.en.md new file mode 100644 index 000000000..af9213c06 --- /dev/null +++ b/changes/ce/fix-10042.en.md @@ -0,0 +1,5 @@ +Improve behavior of the `replicant` nodes when the `core` cluster becomes partitioned (for example when a core node leaves the cluster). +Previously, the replicant nodes were unable to rebalance connections to the core nodes, until the core cluster became whole again. +This was indicated by the error messages: `[error] line: 182, mfa: mria_lb:list_core_nodes/1, msg: mria_lb_core_discovery divergent cluster`. + +[Mria PR](https://github.com/emqx/mria/pull/123/files) diff --git a/changes/ce/fix-10042.zh.md b/changes/ce/fix-10042.zh.md new file mode 100644 index 000000000..80db204e2 --- /dev/null +++ b/changes/ce/fix-10042.zh.md @@ -0,0 +1,6 @@ +改进 `core` 集群被分割时 `replicant`节点的行为。 +修复前,如果 `core` 集群分裂成两个小集群(例如一个节点离开集群)时,`replicant` 节点无法重新平衡与核心节点的连接,直到核心集群再次变得完整。 +这种个问题会导致 replicant 节点出现如下日志: +`[error] line: 182, mfa: mria_lb:list_core_nodes/1, msg: mria_lb_core_discovery divergent cluster`。 + +[Mria PR](https://github.com/emqx/mria/pull/123/files) diff --git a/changes/ce/fix-10043.en.md b/changes/ce/fix-10043.en.md new file mode 100644 index 000000000..4fd46cb4e --- /dev/null +++ b/changes/ce/fix-10043.en.md @@ -0,0 +1,3 @@ +Fixed two bugs introduced in v5.0.18. +* The environment varialbe `SSL_DIST_OPTFILE` was not set correctly for non-boot commands. +* When cookie is overridden from environment variable, EMQX node is unable to start. diff --git a/changes/ce/fix-10043.zh.md b/changes/ce/fix-10043.zh.md new file mode 100644 index 000000000..6b150f6fb --- /dev/null +++ b/changes/ce/fix-10043.zh.md @@ -0,0 +1,3 @@ +修复 v5.0.18 引入的 2 个bug。 +* 环境变量 `SSL_DIST_OPTFILE` 的值设置错误导致节点无法为 Erlang distribution 启用 SSL。 +* 当节点的 cookie 从环境变量重载 (而不是设置在配置文件中时),节点无法启动的问题。 diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf index 8b2eadcfa..d73d62b14 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf @@ -22,14 +22,16 @@ See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/ TLDR:
``` [,=[,=]] =[,=] [] -```""" +``` +Please note that a placeholder for an integer value must be annotated with a suffix `i`. For example `${payload.int_value}i`.""" zh: """使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符
参考 [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) 及 [InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
TLDR:
``` [,=[,=]] =[,=] [] -```""" +``` +注意,整形数值占位符后需要添加一个字符 `i` 类型标识。例如 `${payload.int_value}i`""" } label { en: "Write Syntax" diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index a395f8a36..c9ef38330 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -663,6 +663,54 @@ t_start_ok_no_subject_tags_write_syntax(Config) -> ), ok. +t_const_timestamp(Config) -> + QueryMode = ?config(query_mode, Config), + Const = erlang:system_time(nanosecond), + ConstBin = integer_to_binary(Const), + TsStr = iolist_to_binary( + calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}]) + ), + ?assertMatch( + {ok, _}, + create_bridge( + Config, + #{ + <<"write_syntax">> => + <<"mqtt,clientid=${clientid} foo=${payload.foo}i,bar=5i ", ConstBin/binary>> + } + ) + ), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{<<"foo">> => 123}, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"payload">> => Payload + }, + ?assertEqual(ok, send_message(Config, SentData)), + case QueryMode of + async -> ct:sleep(500); + sync -> ok + end, + PersistedData = query_by_clientid(ClientId, Config), + Expected = #{foo => <<"123">>}, + assert_persisted_data(ClientId, Expected, PersistedData), + TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"foo">>, PersistedData)), + TimeReturned = pad_zero(TimeReturned0), + ?assertEqual(TsStr, TimeReturned). + +%% influxdb returns timestamps without trailing zeros such as +%% "2023-02-28T17:21:51.63678163Z" +%% while the standard should be +%% "2023-02-28T17:21:51.636781630Z" +pad_zero(BinTs) -> + StrTs = binary_to_list(BinTs), + [Nano | Rest] = lists:reverse(string:tokens(StrTs, ".")), + [$Z | NanoNum] = lists:reverse(Nano), + Padding = lists:duplicate(10 - length(Nano), $0), + NewNano = lists:reverse(NanoNum) ++ Padding ++ "Z", + iolist_to_binary(string:join(lists:reverse([NewNano | Rest]), ".")). + t_boolean_variants(Config) -> QueryMode = ?config(query_mode, Config), ?assertMatch( @@ -783,7 +831,7 @@ t_bad_timestamp(Config) -> [ #{ error := [ - {error, {bad_timestamp, [<<"bad_timestamp">>]}} + {error, {bad_timestamp, <<"bad_timestamp">>}} ] } ], @@ -793,7 +841,7 @@ t_bad_timestamp(Config) -> ?assertEqual( {error, {unrecoverable_error, [ - {error, {bad_timestamp, [<<"bad_timestamp">>]}} + {error, {bad_timestamp, <<"bad_timestamp">>}} ]}}, Return ); diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 453f949be..a1496cabd 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -492,11 +492,11 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error is_list(Ts) -> TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - case emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions) of - [TsInt] when is_integer(TsInt) -> + case parse_timestamp(emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions)) of + {ok, TsInt} -> Item1 = Item#{timestamp => TsInt}, continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc); - BadTs -> + {error, BadTs} -> lines_to_points(Data, Rest, ResultPointsAcc, [ {error, {bad_timestamp, BadTs}} | ErrorPointsAcc ]) @@ -506,6 +506,16 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error -> continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc). +parse_timestamp([TsInt]) when is_integer(TsInt) -> + {ok, TsInt}; +parse_timestamp([TsBin]) -> + try + {ok, binary_to_integer(TsBin)} + catch + _:_ -> + {error, TsBin} + end. + continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) -> case line_to_point(Data, Item) of #{fields := Fields} when map_size(Fields) =:= 0 -> diff --git a/mix.exs b/mix.exs index b262dd969..d72f64039 100644 --- a/mix.exs +++ b/mix.exs @@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, {:esockd, github: "emqx/esockd", tag: "5.9.4", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-9", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.14.1", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.14.2", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.8", override: true}, @@ -69,7 +69,7 @@ defmodule EMQXUmbrella.MixProject do # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true}, - {:hocon, github: "emqx/hocon", tag: "0.35.3", override: true}, + {:hocon, github: "emqx/hocon", tag: "0.36.0", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, diff --git a/rebar.config b/rebar.config index 302f3a2f9..108a64ecf 100644 --- a/rebar.config +++ b/rebar.config @@ -56,7 +56,7 @@ , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-9"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.1"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.2"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.8"}}} @@ -69,7 +69,7 @@ , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.3"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.36.0"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} diff --git a/scripts/test/influx/README.md b/scripts/test/influx/README.md new file mode 100644 index 000000000..ee492e69e --- /dev/null +++ b/scripts/test/influx/README.md @@ -0,0 +1,25 @@ +# Test influxdb integration + +This script starts two EMQX nodes and a influxdb server in docker container. +The bootstraping rule engine and data bridge config is provided in influx-bridge.conf +which got included in the bootstraping config bundle emqx.conf. + +## Start the cluster + +./start.sh + +## How to run tests + +The rule and bridge are configured to pipe data from MQTT topic `t/#` to the 'myvalues' measurement in the 'mqtt' bucket. + +### Manual verification steps + +* Start the cluster +* Send mqtt messages to topic `/t/a` with a JSON object as MQTT paylaod like `{"value": 1}` +* Observe data in influxdb `curl -k -H 'Authorization: Token abcdefg' -G 'https://localhost:8086/query?pretty=true' --data-urlencode "db=mqtt" --data-urlencode "q=SELECT * from myvalues"` + +Example output the curl query against influxdb: + +``` +{"results":[{"statement_id":0,"series":[{"name":"myvalues","columns":["time","clientid","value"],"values":[["2023-02-28T11:13:29.039Z","a1",123]]}]}] +``` diff --git a/scripts/test/influx/influx-bridge.conf b/scripts/test/influx/influx-bridge.conf index 3b5bb9f9f..df10a0ec6 100644 --- a/scripts/test/influx/influx-bridge.conf +++ b/scripts/test/influx/influx-bridge.conf @@ -30,7 +30,7 @@ bridges { versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"] } token = "abcdefg" - write_syntax = "mqtt,clientid=${clientid} value=${payload.value}" + write_syntax = "myvalues,clientid=${clientid} value=${payload.value}i" } } } diff --git a/scripts/test/start-two-nodes-in-docker.sh b/scripts/test/start-two-nodes-in-docker.sh index 16f46a53f..c174bc630 100755 --- a/scripts/test/start-two-nodes-in-docker.sh +++ b/scripts/test/start-two-nodes-in-docker.sh @@ -8,9 +8,10 @@ set -euo pipefail ## this is why a docker network is created, and the containers's names have a dot. # ensure dir -cd -P -- "$(dirname -- "$0")/.." +cd -P -- "$(dirname -- "$0")/../../" -IMAGE="${1}" +IMAGE1="${1}" +IMAGE2="${2:-${IMAGE1}}" NET='emqx.io' NODE1="node1.$NET" @@ -35,7 +36,7 @@ docker run -d -t --restart=always --name "$NODE1" \ -e EMQX_listeners__wss__default__enable=false \ -e EMQX_listeners__tcp__default__proxy_protocol=true \ -e EMQX_listeners__ws__default__proxy_protocol=true \ - "$IMAGE" + "$IMAGE1" docker run -d -t --restart=always --name "$NODE2" \ --net "$NET" \ @@ -47,7 +48,7 @@ docker run -d -t --restart=always --name "$NODE2" \ -e EMQX_listeners__wss__default__enable=false \ -e EMQX_listeners__tcp__default__proxy_protocol=true \ -e EMQX_listeners__ws__default__proxy_protocol=true \ - "$IMAGE" + "$IMAGE2" mkdir -p tmp cat < tmp/haproxy.cfg @@ -84,14 +85,17 @@ defaults ## API ##---------------------------------------------------------------- frontend emqx_dashboard - mode tcp - option tcplog - bind *:18083 - default_backend emqx_dashboard_back + mode tcp + option tcplog + bind *:18083 + default_backend emqx_dashboard_back backend emqx_dashboard_back + # Must use a consistent dispatch when EMQX is running on different versions + # because the js files for the dashboard is chunked, having the backends sharing + # load randomly will cause the browser fail to GET some chunks (or get bad chunks if names clash) + balance source mode http - # balance static-rr server emqx-1 $NODE1:18083 server emqx-2 $NODE2:18083 @@ -142,7 +146,7 @@ wait_for_emqx() { container="$1" wait_limit="$2" wait_sec=0 - while ! docker exec "$container" emqx_ctl status >/dev/null 2>&1; do + while ! docker exec "$container" emqx ctl status; do wait_sec=$(( wait_sec + 1 )) if [ $wait_sec -gt "$wait_limit" ]; then echo "timeout wait for EMQX" @@ -178,4 +182,4 @@ wait_for_haproxy 10 echo -docker exec $NODE1 emqx_ctl cluster join "emqx@$NODE2" +docker exec $NODE1 emqx ctl cluster join "emqx@$NODE2"