fix: avoid error 500 when node is re-joining cluster
Fixes https://emqx.atlassian.net/browse/EMQX-9899
This commit is contained in:
parent
1f36726cab
commit
ea86f4442b
|
@ -147,7 +147,7 @@ unwrap_erpc({throw, A}) ->
|
||||||
{error, A};
|
{error, A};
|
||||||
unwrap_erpc({error, {exception, Err, _Stack}}) ->
|
unwrap_erpc({error, {exception, Err, _Stack}}) ->
|
||||||
{error, Err};
|
{error, Err};
|
||||||
unwrap_erpc({error, {exit, Err}}) ->
|
unwrap_erpc({exit, Err}) ->
|
||||||
{error, Err};
|
{error, Err};
|
||||||
unwrap_erpc({error, {erpc, Err}}) ->
|
unwrap_erpc({error, {erpc, Err}}) ->
|
||||||
{error, Err}.
|
{error, Err}.
|
||||||
|
|
|
@ -423,8 +423,8 @@ users(get, #{query_string := QueryString}) ->
|
||||||
of
|
of
|
||||||
{error, page_limit_invalid} ->
|
{error, page_limit_invalid} ->
|
||||||
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
||||||
{error, Node, {badrpc, R}} ->
|
{error, Node, Error} ->
|
||||||
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
|
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
|
||||||
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
||||||
Result ->
|
Result ->
|
||||||
{200, Result}
|
{200, Result}
|
||||||
|
@ -459,8 +459,8 @@ clients(get, #{query_string := QueryString}) ->
|
||||||
of
|
of
|
||||||
{error, page_limit_invalid} ->
|
{error, page_limit_invalid} ->
|
||||||
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
||||||
{error, Node, {badrpc, R}} ->
|
{error, Node, Error} ->
|
||||||
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
|
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
|
||||||
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
||||||
Result ->
|
Result ->
|
||||||
{200, Result}
|
{200, Result}
|
||||||
|
|
|
@ -756,7 +756,14 @@ format_bridge_info([FirstBridge | _] = Bridges) ->
|
||||||
}).
|
}).
|
||||||
|
|
||||||
format_bridge_metrics(Bridges) ->
|
format_bridge_metrics(Bridges) ->
|
||||||
NodeMetrics = collect_metrics(Bridges),
|
FilteredBridges = lists:filter(
|
||||||
|
fun
|
||||||
|
({_Node, Metric}) when is_map(Metric) -> true;
|
||||||
|
(_) -> false
|
||||||
|
end,
|
||||||
|
Bridges
|
||||||
|
),
|
||||||
|
NodeMetrics = collect_metrics(FilteredBridges),
|
||||||
#{
|
#{
|
||||||
metrics => aggregate_metrics(NodeMetrics),
|
metrics => aggregate_metrics(NodeMetrics),
|
||||||
node_metrics => NodeMetrics
|
node_metrics => NodeMetrics
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_ctl, [
|
{application, emqx_ctl, [
|
||||||
{description, "Backend for emqx_ctl script"},
|
{description, "Backend for emqx_ctl script"},
|
||||||
{vsn, "0.1.0"},
|
{vsn, "0.1.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_ctl_app, []}},
|
{mod, {emqx_ctl_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -228,7 +228,7 @@ handle_call({register_command, Cmd, MF, Opts}, _From, State = #state{seq = Seq})
|
||||||
ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}),
|
ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}),
|
||||||
{reply, ok, next_seq(State)};
|
{reply, ok, next_seq(State)};
|
||||||
[[OriginSeq] | _] ->
|
[[OriginSeq] | _] ->
|
||||||
?LOG_WARNING(#{msg => "CMD_overidden", cmd => Cmd, mf => MF}),
|
?LOG_WARNING(#{msg => "CMD_overridden", cmd => Cmd, mf => MF}),
|
||||||
true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts}),
|
true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts}),
|
||||||
{reply, ok, State}
|
{reply, ok, State}
|
||||||
end;
|
end;
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_gateway, [
|
{application, emqx_gateway, [
|
||||||
{description, "The Gateway management application"},
|
{description, "The Gateway management application"},
|
||||||
{vsn, "0.1.16"},
|
{vsn, "0.1.17"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_gateway_app, []}},
|
{mod, {emqx_gateway_app, []}},
|
||||||
{applications, [kernel, stdlib, emqx, emqx_authn, emqx_ctl]},
|
{applications, [kernel, stdlib, emqx, emqx_authn, emqx_ctl]},
|
||||||
|
|
|
@ -133,8 +133,10 @@ clients(get, #{
|
||||||
case Result of
|
case Result of
|
||||||
{error, page_limit_invalid} ->
|
{error, page_limit_invalid} ->
|
||||||
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
||||||
{error, Node, {badrpc, R}} ->
|
{error, Node, Error} ->
|
||||||
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
|
Message = list_to_binary(
|
||||||
|
io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])
|
||||||
|
),
|
||||||
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
||||||
Response ->
|
Response ->
|
||||||
{200, Response}
|
{200, Response}
|
||||||
|
|
|
@ -134,8 +134,8 @@ do_node_query(
|
||||||
ResultAcc
|
ResultAcc
|
||||||
) ->
|
) ->
|
||||||
case do_query(Node, QueryState) of
|
case do_query(Node, QueryState) of
|
||||||
{error, {badrpc, R}} ->
|
{error, Error} ->
|
||||||
{error, Node, {badrpc, R}};
|
{error, Node, Error};
|
||||||
{Rows, NQueryState = #{complete := Complete}} ->
|
{Rows, NQueryState = #{complete := Complete}} ->
|
||||||
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
|
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
|
||||||
{enough, NResultAcc} ->
|
{enough, NResultAcc} ->
|
||||||
|
@ -179,8 +179,8 @@ do_cluster_query(
|
||||||
ResultAcc
|
ResultAcc
|
||||||
) ->
|
) ->
|
||||||
case do_query(Node, QueryState) of
|
case do_query(Node, QueryState) of
|
||||||
{error, {badrpc, R}} ->
|
{error, Error} ->
|
||||||
{error, Node, {badrpc, R}};
|
{error, Node, Error};
|
||||||
{Rows, NQueryState = #{complete := Complete}} ->
|
{Rows, NQueryState = #{complete := Complete}} ->
|
||||||
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
|
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
|
||||||
{enough, NResultAcc} ->
|
{enough, NResultAcc} ->
|
||||||
|
@ -275,7 +275,7 @@ do_query(Node, QueryState) when Node =:= node() ->
|
||||||
do_select(Node, QueryState);
|
do_select(Node, QueryState);
|
||||||
do_query(Node, QueryState) ->
|
do_query(Node, QueryState) ->
|
||||||
case
|
case
|
||||||
rpc:call(
|
catch rpc:call(
|
||||||
Node,
|
Node,
|
||||||
?MODULE,
|
?MODULE,
|
||||||
do_query,
|
do_query,
|
||||||
|
@ -284,6 +284,7 @@ do_query(Node, QueryState) ->
|
||||||
)
|
)
|
||||||
of
|
of
|
||||||
{badrpc, _} = R -> {error, R};
|
{badrpc, _} = R -> {error, R};
|
||||||
|
{'EXIT', _} = R -> {error, R};
|
||||||
Ret -> Ret
|
Ret -> Ret
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -298,15 +299,24 @@ do_select(
|
||||||
) ->
|
) ->
|
||||||
QueryState = maybe_apply_total_query(Node, QueryState0),
|
QueryState = maybe_apply_total_query(Node, QueryState0),
|
||||||
Result =
|
Result =
|
||||||
case maps:get(continuation, QueryState, undefined) of
|
try
|
||||||
undefined ->
|
case maps:get(continuation, QueryState, undefined) of
|
||||||
ets:select(Tab, Ms, Limit);
|
undefined ->
|
||||||
Continuation ->
|
ets:select(Tab, Ms, Limit);
|
||||||
%% XXX: Repair is necessary because we pass Continuation back
|
Continuation ->
|
||||||
%% and forth through the nodes in the `do_cluster_query`
|
%% XXX: Repair is necessary because we pass Continuation back
|
||||||
ets:select(ets:repair_continuation(Continuation, Ms))
|
%% and forth through the nodes in the `do_cluster_query`
|
||||||
|
ets:select(ets:repair_continuation(Continuation, Ms))
|
||||||
|
end
|
||||||
|
catch
|
||||||
|
exit:_ = Exit ->
|
||||||
|
{error, Exit};
|
||||||
|
Type:Reason:Stack ->
|
||||||
|
{error, #{exception => Type, reason => Reason, stacktrace => Stack}}
|
||||||
end,
|
end,
|
||||||
case Result of
|
case Result of
|
||||||
|
{error, _} ->
|
||||||
|
{[], mark_complete(QueryState)};
|
||||||
{Rows, '$end_of_table'} ->
|
{Rows, '$end_of_table'} ->
|
||||||
NRows = maybe_apply_fuzzy_filter(Rows, QueryState),
|
NRows = maybe_apply_fuzzy_filter(Rows, QueryState),
|
||||||
{NRows, mark_complete(QueryState)};
|
{NRows, mark_complete(QueryState)};
|
||||||
|
@ -354,7 +364,11 @@ counting_total_fun(_QueryState = #{match_spec := Ms, fuzzy_fun := undefined}) ->
|
||||||
[{MatchHead, Conditions, _Return}] = Ms,
|
[{MatchHead, Conditions, _Return}] = Ms,
|
||||||
CountingMs = [{MatchHead, Conditions, [true]}],
|
CountingMs = [{MatchHead, Conditions, [true]}],
|
||||||
fun(Tab) ->
|
fun(Tab) ->
|
||||||
ets:select_count(Tab, CountingMs)
|
try
|
||||||
|
ets:select_count(Tab, CountingMs)
|
||||||
|
catch
|
||||||
|
_Type:_Reason -> 0
|
||||||
|
end
|
||||||
end;
|
end;
|
||||||
counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when FuzzyFun =/= undefined ->
|
counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when FuzzyFun =/= undefined ->
|
||||||
%% XXX: Calculating the total number for a fuzzy searching is very very expensive
|
%% XXX: Calculating the total number for a fuzzy searching is very very expensive
|
||||||
|
|
|
@ -123,8 +123,8 @@ alarms(get, #{query_string := QString}) ->
|
||||||
of
|
of
|
||||||
{error, page_limit_invalid} ->
|
{error, page_limit_invalid} ->
|
||||||
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
||||||
{error, Node, {badrpc, R}} ->
|
{error, Node, Error} ->
|
||||||
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
|
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
|
||||||
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
||||||
Response ->
|
Response ->
|
||||||
{200, Response}
|
{200, Response}
|
||||||
|
|
|
@ -120,8 +120,8 @@ do_list(Params) ->
|
||||||
of
|
of
|
||||||
{error, page_limit_invalid} ->
|
{error, page_limit_invalid} ->
|
||||||
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
||||||
{error, Node, {badrpc, R}} ->
|
{error, Node, Error} ->
|
||||||
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
|
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
|
||||||
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
||||||
Response ->
|
Response ->
|
||||||
{200, Response}
|
{200, Response}
|
||||||
|
|
|
@ -339,6 +339,9 @@ param_path_id() ->
|
||||||
of
|
of
|
||||||
{error, page_limit_invalid} ->
|
{error, page_limit_invalid} ->
|
||||||
{400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}};
|
{400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}};
|
||||||
|
{error, Node, Error} ->
|
||||||
|
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
|
||||||
|
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
||||||
Result ->
|
Result ->
|
||||||
{200, Result}
|
{200, Result}
|
||||||
end;
|
end;
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fix Internal Error 500 that occurred sometimes when bridge statistics page was updated while a node was (re)joining the cluster.
|
Loading…
Reference in New Issue