diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 61373f638..3654b8fae 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -653,11 +653,17 @@ emqx_cluster(Specs0, CommonOpts) -> ]), %% Set the default node of the cluster: CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs], - JoinTo = + JoinTo0 = case CoreNodes of [First | _] -> First; _ -> undefined end, + JoinTo = + case maps:find(join_to, CommonOpts) of + {ok, true} -> JoinTo0; + {ok, JT} -> JT; + error -> JoinTo0 + end, [ {Name, merge_opts(Opts, #{ diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 9802d5fe8..847270664 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -882,6 +882,29 @@ format_metrics(#{ Rate5m, RateMax, Rcvd + ); +format_metrics(_Metrics) -> + %% Empty metrics: can happen when a node joins another and a + %% bridge is not yet replicated to it, so the counters map is + %% empty. + ?METRICS( + _Dropped = 0, + _DroppedOther = 0, + _DroppedExpired = 0, + _DroppedQueueFull = 0, + _DroppedResourceNotFound = 0, + _DroppedResourceStopped = 0, + _Matched = 0, + _Queued = 0, + _Retried = 0, + _LateReply = 0, + _SentFailed = 0, + _SentInflight = 0, + _SentSucc = 0, + _Rate = 0, + _Rate5m = 0, + _RateMax = 0, + _Rcvd = 0 ). fill_defaults(Type, RawConf) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 288b1da29..27c1c779a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -70,18 +70,22 @@ all() -> [ {group, single}, + {group, cluster_later_join}, {group, cluster} ]. groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), SingleOnlyTests = [ t_broken_bpapi_vsn, t_old_bpapi_vsn, t_bridges_probe ], + ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics], [ - {single, [], emqx_common_test_helpers:all(?MODULE)}, - {cluster, [], emqx_common_test_helpers:all(?MODULE) -- SingleOnlyTests} + {single, [], AllTCs -- ClusterLaterJoinOnlyTCs}, + {cluster_later_join, [], ClusterLaterJoinOnlyTCs}, + {cluster, [], (AllTCs -- SingleOnlyTests) -- ClusterLaterJoinOnlyTCs} ]. suite() -> @@ -104,6 +108,17 @@ init_per_group(cluster, Config) -> ok = erpc:call(NodePrimary, fun() -> init_node(primary) end), _ = [ok = erpc:call(Node, fun() -> init_node(regular) end) || Node <- NodesRest], [{group, cluster}, {cluster_nodes, Nodes}, {api_node, NodePrimary} | Config]; +init_per_group(cluster_later_join, Config) -> + Cluster = mk_cluster_specs(Config, #{join_to => undefined}), + ct:pal("Starting ~p", [Cluster]), + Nodes = [ + emqx_common_test_helpers:start_slave(Name, Opts) + || {Name, Opts} <- Cluster + ], + [NodePrimary | NodesRest] = Nodes, + ok = erpc:call(NodePrimary, fun() -> init_node(primary) end), + _ = [ok = erpc:call(Node, fun() -> init_node(regular) end) || Node <- NodesRest], + [{group, cluster_later_join}, {cluster_nodes, Nodes}, {api_node, NodePrimary} | Config]; init_per_group(_, Config) -> ok = emqx_mgmt_api_test_util:init_suite(?SUITE_APPS), ok = load_suite_config(emqx_rule_engine), @@ -111,6 +126,9 @@ init_per_group(_, Config) -> [{group, single}, {api_node, node()} | Config]. mk_cluster_specs(Config) -> + mk_cluster_specs(Config, #{}). + +mk_cluster_specs(Config, Opts) -> Specs = [ {core, emqx_bridge_api_SUITE1, #{}}, {core, emqx_bridge_api_SUITE2, #{}} @@ -132,6 +150,7 @@ mk_cluster_specs(Config) -> load_apps => ?SUITE_APPS ++ [emqx_dashboard], env_handler => fun load_suite_config/1, load_schema => false, + join_to => maps:get(join_to, Opts, true), priv_data_dir => ?config(priv_dir, Config) }, emqx_common_test_helpers:emqx_cluster(Specs, CommonOpts). @@ -164,7 +183,10 @@ load_suite_config(emqx_bridge) -> load_suite_config(_) -> ok. -end_per_group(cluster, Config) -> +end_per_group(Group, Config) when + Group =:= cluster; + Group =:= cluster_later_join +-> ok = lists:foreach( fun(Node) -> _ = erpc:call(Node, emqx_common_test_helpers, stop_apps, [?SUITE_APPS]), @@ -1298,6 +1320,44 @@ t_inconsistent_webhook_request_timeouts(Config) -> validate_resource_request_timeout(proplists:get_value(group, Config), 1000, Name), ok. +t_cluster_later_join_metrics(Config) -> + Port = ?config(port, Config), + APINode = ?config(api_node, Config), + ClusterNodes = ?config(cluster_nodes, Config), + [OtherNode | _] = ClusterNodes -- [APINode], + URL1 = ?URL(Port, "path1"), + Name = ?BRIDGE_NAME, + BridgeParams = ?HTTP_BRIDGE(URL1, Name), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), + ?check_trace( + begin + %% Create a bridge on only one of the nodes. + ?assertMatch({ok, 201, _}, request_json(post, uri(["bridges"]), BridgeParams, Config)), + %% Pre-condition. + ?assertMatch( + {ok, 200, #{ + <<"metrics">> := #{<<"success">> := _}, + <<"node_metrics">> := [_ | _] + }}, + request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) + ), + %% Now join the other node join with the api node. + ok = erpc:call(OtherNode, ekka, join, [APINode]), + %% Check metrics; shouldn't crash even if the bridge is not + %% ready on the node that just joined the cluster. + ?assertMatch( + {ok, 200, #{ + <<"metrics">> := #{<<"success">> := _}, + <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _] + }}, + request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) + ), + ok + end, + [] + ), + ok. + validate_resource_request_timeout(single, Timeout, Name) -> SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000}, BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), diff --git a/changes/ce/fix-10743.en.md b/changes/ce/fix-10743.en.md new file mode 100644 index 000000000..95e6b3652 --- /dev/null +++ b/changes/ce/fix-10743.en.md @@ -0,0 +1 @@ +Fixes an issue where trying to get a bridge info or metrics could result in a crash when a node is joining a cluster.