From 8ce16fd7d93e2b28d3d31b44ef5569592d70637c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 18 Jun 2024 10:38:08 -0300 Subject: [PATCH 1/2] fix(dsreplmeta): check site status when fetching shard info Fixes https://emqx.atlassian.net/browse/EMQX-12356 --- .../src/emqx_ds_replication_layer_meta.erl | 11 +- .../test/emqx_mgmt_api_ds_SUITE.erl | 113 +++++++++++++++++- .../test/emqx_mgmt_api_test_util.erl | 34 ++++++ changes/ce/fix-13291.en.md | 1 + 4 files changed, 155 insertions(+), 4 deletions(-) create mode 100644 changes/ce/fix-13291.en.md diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 387dddbcf..9d9d69c90 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -273,7 +273,7 @@ shards(DB) -> [Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs]. -spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> - #{replica_set := #{site() => #{status => up | joining}}} + #{replica_set := #{site() => #{status => up | down}}} | undefined. shard_info(DB, Shard) -> case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of @@ -282,8 +282,13 @@ shard_info(DB, Shard) -> [#?SHARD_TAB{replica_set = Replicas}] -> ReplicaSet = maps:from_list([ begin - %% TODO: - ReplInfo = #{status => up}, + Status = + case mria:cluster_status(?MODULE:node(I)) of + running -> up; + stopped -> down; + false -> down + end, + ReplInfo = #{status => Status}, {I, ReplInfo} end || I <- Replicas diff --git a/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl index 881ce8e3f..dc28864be 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl @@ -23,8 +23,22 @@ -import(emqx_mgmt_api_test_util, [api_path/1, request_api/2, request_api_with_body/3]). +-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)). + all() -> - emqx_common_test_helpers:all(?MODULE). + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {group, cluster}, + AllTCs -- cluster_tests() + ]. + +groups() -> + [{cluster, [], cluster_tests()}]. + +cluster_tests() -> + [ + t_get_db_node_down + ]. init_per_suite(Config) -> Apps = emqx_cth_suite:start( @@ -41,6 +55,38 @@ init_per_suite(Config) -> end_per_suite(Config) -> ok = emqx_cth_suite:stop(?config(suite_apps, Config)). +init_per_group(cluster = Group, Config) -> + AppSpecs = [ + {emqx, "durable_sessions.enable = true\n"}, + emqx_management + ], + Dashboard = emqx_mgmt_api_test_util:emqx_dashboard( + %% Using different port so it doesn't clash with the master node's port. + "dashboard.listeners.http { enable = true, bind = 18084 }" + ), + Cluster = [ + {mgmt_api_ds_SUITE1, #{role => core, apps => AppSpecs ++ [Dashboard]}}, + {mgmt_api_ds_SUITE2, #{role => core, apps => AppSpecs}} + ], + ClusterOpts = #{work_dir => emqx_cth_suite:work_dir(Group, Config)}, + NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts), + Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts), + [ + {nodes, Nodes}, + {node_specs, NodeSpecs}, + {cluster_opts, ClusterOpts} + | Config + ]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(cluster, Config) -> + Nodes = ?config(nodes, Config), + emqx_cth_cluster:stop(Nodes), + ok; +end_per_group(_Group, _Config) -> + ok. + init_per_testcase(_, Config) -> Config. @@ -124,6 +170,62 @@ t_get_db(_) -> emqx_utils_json:decode(Response) ). +%% Smoke test that checks the status of down replica nodes. +t_get_db_node_down(Config) -> + [_N1, N2] = ?config(nodes, Config), + [_N1Spec, N2Spec] = ?config(node_specs, Config), + Path = api_path_cluster(["ds", "storages", "messages"], Config), + Params = "", + + {Status1, Res1} = simple_request(get, Path, Params), + ?assertEqual(200, Status1), + #{<<"shards">> := Shards1} = Res1, + Replicas1 = [Replica || #{<<"replicas">> := Replicas} <- Shards1, Replica <- Replicas], + ?assert( + lists:all(fun(#{<<"status">> := Status}) -> Status =:= <<"up">> end, Replicas1), + #{replicas => Replicas1} + ), + + %% Now make one of the nodes down. + StoppedSite = ?ON(N2, emqx_ds_replication_layer_meta:this_site()), + ok = emqx_cth_cluster:stop_node(N2), + ct:sleep(1_000), + + {Status2, Res2} = simple_request(get, Path, Params), + ?assertEqual(200, Status2), + #{<<"shards">> := Shards2} = Res2, + Replicas2 = [Replica || #{<<"replicas">> := Replicas} <- Shards2, Replica <- Replicas], + ?assert( + lists:all( + fun(#{<<"site">> := Site, <<"status">> := Status}) -> + case Site =:= StoppedSite of + true -> Status =:= <<"down">>; + false -> Status =:= <<"up">> + end + end, + Replicas2 + ), + #{ + replicas => Replicas2, + stopped_site => StoppedSite + } + ), + + %% Back online + emqx_cth_cluster:restart(N2Spec), + ct:sleep(1_000), + + {Status3, Res3} = simple_request(get, Path, Params), + ?assertEqual(200, Status3), + #{<<"shards">> := Shards3} = Res3, + Replicas3 = [Replica || #{<<"replicas">> := Replicas} <- Shards3, Replica <- Replicas], + ?assert( + lists:all(fun(#{<<"status">> := Status}) -> Status =:= <<"up">> end, Replicas3), + #{replicas => Replicas3} + ), + + ok. + t_get_replicas(_) -> %% Unknown DBs must result in error 400 (since the DS parameter is an enum): Path400 = api_path(["ds", "storages", "unknown_ds", "replicas"]), @@ -186,3 +288,12 @@ parse_error({ok, Code, JSON}) -> {ok, Code, emqx_utils_json:decode(JSON)}; parse_error(Err) -> Err. + +simple_request(Method, Path, Params) -> + emqx_mgmt_api_test_util:simple_request(Method, Path, Params). + +api_path_cluster(Parts, Config) -> + [Node1 | _] = ?config(nodes, Config), + Port = erpc:call(Node1, emqx_config, get, [[dashboard, listeners, http, bind]]), + Host = "http://127.0.0.1:" ++ integer_to_list(Port), + emqx_mgmt_api_test_util:api_path(Host, Parts). diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index 49f356316..c9b763008 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -285,3 +285,37 @@ format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> FileNames ), erlang:iolist_to_binary([WithPaths, StartBoundary, <<"--">>, LineSeparator]). + +simple_request(Method, Path, Params) -> + simple_request(Method, Path, Params, _Opts = #{}). + +simple_request(Method, Path, Params, _Opts) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + case + emqx_mgmt_api_test_util:request_api( + Method, Path, _QueryParams = "", AuthHeader, Params, Opts + ) + of + {ok, {{_, Status, _}, _Headers, Body0}} -> + Body = maybe_json_decode(Body0), + {Status, Body}; + {error, {{_, Status, _}, _Headers, Body0}} -> + Body = + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, Decoded0 = #{<<"message">> := Msg0}} -> + Msg = maybe_json_decode(Msg0), + Decoded0#{<<"message">> := Msg}; + {ok, Decoded0} -> + Decoded0; + {error, _} -> + Body0 + end, + {Status, Body} + end. + +maybe_json_decode(X) -> + case emqx_utils_json:safe_decode(X, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> X + end. diff --git a/changes/ce/fix-13291.en.md b/changes/ce/fix-13291.en.md new file mode 100644 index 000000000..524749253 --- /dev/null +++ b/changes/ce/fix-13291.en.md @@ -0,0 +1 @@ +Fixed an issue where durable storage sites that were down being reported as up. From 4c54ab63797b5ffdc10e91e082d22633234bf590 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 18 Jun 2024 11:31:50 -0300 Subject: [PATCH 2/2] test: remove tests per review request --- .../test/emqx_mgmt_api_ds_SUITE.erl | 113 +----------------- .../test/emqx_mgmt_api_test_util.erl | 34 ------ 2 files changed, 1 insertion(+), 146 deletions(-) diff --git a/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl index dc28864be..881ce8e3f 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl @@ -23,22 +23,8 @@ -import(emqx_mgmt_api_test_util, [api_path/1, request_api/2, request_api_with_body/3]). --define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)). - all() -> - AllTCs = emqx_common_test_helpers:all(?MODULE), - [ - {group, cluster}, - AllTCs -- cluster_tests() - ]. - -groups() -> - [{cluster, [], cluster_tests()}]. - -cluster_tests() -> - [ - t_get_db_node_down - ]. + emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> Apps = emqx_cth_suite:start( @@ -55,38 +41,6 @@ init_per_suite(Config) -> end_per_suite(Config) -> ok = emqx_cth_suite:stop(?config(suite_apps, Config)). -init_per_group(cluster = Group, Config) -> - AppSpecs = [ - {emqx, "durable_sessions.enable = true\n"}, - emqx_management - ], - Dashboard = emqx_mgmt_api_test_util:emqx_dashboard( - %% Using different port so it doesn't clash with the master node's port. - "dashboard.listeners.http { enable = true, bind = 18084 }" - ), - Cluster = [ - {mgmt_api_ds_SUITE1, #{role => core, apps => AppSpecs ++ [Dashboard]}}, - {mgmt_api_ds_SUITE2, #{role => core, apps => AppSpecs}} - ], - ClusterOpts = #{work_dir => emqx_cth_suite:work_dir(Group, Config)}, - NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts), - Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts), - [ - {nodes, Nodes}, - {node_specs, NodeSpecs}, - {cluster_opts, ClusterOpts} - | Config - ]; -init_per_group(_Group, Config) -> - Config. - -end_per_group(cluster, Config) -> - Nodes = ?config(nodes, Config), - emqx_cth_cluster:stop(Nodes), - ok; -end_per_group(_Group, _Config) -> - ok. - init_per_testcase(_, Config) -> Config. @@ -170,62 +124,6 @@ t_get_db(_) -> emqx_utils_json:decode(Response) ). -%% Smoke test that checks the status of down replica nodes. -t_get_db_node_down(Config) -> - [_N1, N2] = ?config(nodes, Config), - [_N1Spec, N2Spec] = ?config(node_specs, Config), - Path = api_path_cluster(["ds", "storages", "messages"], Config), - Params = "", - - {Status1, Res1} = simple_request(get, Path, Params), - ?assertEqual(200, Status1), - #{<<"shards">> := Shards1} = Res1, - Replicas1 = [Replica || #{<<"replicas">> := Replicas} <- Shards1, Replica <- Replicas], - ?assert( - lists:all(fun(#{<<"status">> := Status}) -> Status =:= <<"up">> end, Replicas1), - #{replicas => Replicas1} - ), - - %% Now make one of the nodes down. - StoppedSite = ?ON(N2, emqx_ds_replication_layer_meta:this_site()), - ok = emqx_cth_cluster:stop_node(N2), - ct:sleep(1_000), - - {Status2, Res2} = simple_request(get, Path, Params), - ?assertEqual(200, Status2), - #{<<"shards">> := Shards2} = Res2, - Replicas2 = [Replica || #{<<"replicas">> := Replicas} <- Shards2, Replica <- Replicas], - ?assert( - lists:all( - fun(#{<<"site">> := Site, <<"status">> := Status}) -> - case Site =:= StoppedSite of - true -> Status =:= <<"down">>; - false -> Status =:= <<"up">> - end - end, - Replicas2 - ), - #{ - replicas => Replicas2, - stopped_site => StoppedSite - } - ), - - %% Back online - emqx_cth_cluster:restart(N2Spec), - ct:sleep(1_000), - - {Status3, Res3} = simple_request(get, Path, Params), - ?assertEqual(200, Status3), - #{<<"shards">> := Shards3} = Res3, - Replicas3 = [Replica || #{<<"replicas">> := Replicas} <- Shards3, Replica <- Replicas], - ?assert( - lists:all(fun(#{<<"status">> := Status}) -> Status =:= <<"up">> end, Replicas3), - #{replicas => Replicas3} - ), - - ok. - t_get_replicas(_) -> %% Unknown DBs must result in error 400 (since the DS parameter is an enum): Path400 = api_path(["ds", "storages", "unknown_ds", "replicas"]), @@ -288,12 +186,3 @@ parse_error({ok, Code, JSON}) -> {ok, Code, emqx_utils_json:decode(JSON)}; parse_error(Err) -> Err. - -simple_request(Method, Path, Params) -> - emqx_mgmt_api_test_util:simple_request(Method, Path, Params). - -api_path_cluster(Parts, Config) -> - [Node1 | _] = ?config(nodes, Config), - Port = erpc:call(Node1, emqx_config, get, [[dashboard, listeners, http, bind]]), - Host = "http://127.0.0.1:" ++ integer_to_list(Port), - emqx_mgmt_api_test_util:api_path(Host, Parts). diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index c9b763008..49f356316 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -285,37 +285,3 @@ format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> FileNames ), erlang:iolist_to_binary([WithPaths, StartBoundary, <<"--">>, LineSeparator]). - -simple_request(Method, Path, Params) -> - simple_request(Method, Path, Params, _Opts = #{}). - -simple_request(Method, Path, Params, _Opts) -> - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - Opts = #{return_all => true}, - case - emqx_mgmt_api_test_util:request_api( - Method, Path, _QueryParams = "", AuthHeader, Params, Opts - ) - of - {ok, {{_, Status, _}, _Headers, Body0}} -> - Body = maybe_json_decode(Body0), - {Status, Body}; - {error, {{_, Status, _}, _Headers, Body0}} -> - Body = - case emqx_utils_json:safe_decode(Body0, [return_maps]) of - {ok, Decoded0 = #{<<"message">> := Msg0}} -> - Msg = maybe_json_decode(Msg0), - Decoded0#{<<"message">> := Msg}; - {ok, Decoded0} -> - Decoded0; - {error, _} -> - Body0 - end, - {Status, Body} - end. - -maybe_json_decode(X) -> - case emqx_utils_json:safe_decode(X, [return_maps]) of - {ok, Decoded} -> Decoded; - {error, _} -> X - end.