Merge pull request #11107 from thalesmg/fix-mongo-health-check-reason-master

fix(mongo): return health check failure reason
This commit is contained in:
Thales Macedo Garitezi 2023-06-22 09:30:34 -03:00 committed by GitHub
commit 2f00cf7f84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 106 additions and 25 deletions

View File

@ -36,5 +36,14 @@ jobs:
env:
MIX_ENV: emqx-enterprise
PROFILE: emqx-enterprise
- name: produced lock files
uses: actions/upload-artifact@v3
if: failure()
with:
name: produced_lock_files
path: |
mix.lock
rebar.lock
retention-days: 1
...

View File

@ -269,28 +269,46 @@ on_query(
{ok, Result}
end.
on_get_status(InstId, #{pool_name := PoolName}) ->
on_get_status(InstId, State = #{pool_name := PoolName}) ->
case health_check(PoolName) of
true ->
ok ->
?tp(debug, emqx_connector_mongo_health_check, #{
instance_id => InstId,
status => ok
}),
connected;
false ->
{error, Reason} ->
?tp(warning, emqx_connector_mongo_health_check, #{
instance_id => InstId,
reason => Reason,
status => failed
}),
disconnected
{disconnected, State, Reason}
end.
health_check(PoolName) ->
emqx_resource_pool:health_check_workers(
PoolName,
fun ?MODULE:check_worker_health/1,
?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
).
Results =
emqx_resource_pool:health_check_workers(
PoolName,
fun ?MODULE:check_worker_health/1,
?HEALTH_CHECK_TIMEOUT + timer:seconds(1),
#{return_values => true}
),
case Results of
{ok, []} ->
{error, worker_processes_dead};
{ok, Values} ->
case lists:partition(fun(V) -> V =:= ok end, Values) of
{_Ok, []} ->
ok;
{_Ok, [{error, Reason} | _Errors]} ->
{error, Reason};
{_Ok, [Error | _Errors]} ->
{error, Error}
end;
{error, Reason} ->
{error, Reason}
end.
%% ===================================================================
@ -302,9 +320,9 @@ check_worker_health(Conn) ->
msg => "mongo_connection_get_status_error",
reason => Reason
}),
false;
{error, Reason};
_ ->
true
ok
catch
Class:Error ->
?SLOG(warning, #{
@ -312,7 +330,7 @@ check_worker_health(Conn) ->
class => Class,
error => Error
}),
false
{error, {Class, Error}}
end.
do_test_query(Conn) ->

View File

@ -20,7 +20,8 @@
start/3,
stop/1,
health_check_workers/2,
health_check_workers/3
health_check_workers/3,
health_check_workers/4
]).
-include_lib("emqx/include/logger.hrl").
@ -66,9 +67,13 @@ stop(Name) ->
end.
health_check_workers(PoolName, CheckFunc) ->
health_check_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT).
health_check_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT, _Opts = #{}).
health_check_workers(PoolName, CheckFunc, Timeout) ->
health_check_workers(PoolName, CheckFunc, Timeout, _Opts = #{}).
health_check_workers(PoolName, CheckFunc, Timeout, Opts) ->
ReturnValues = maps:get(return_values, Opts, false),
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
DoPerWorker =
fun(Worker) ->
@ -76,18 +81,26 @@ health_check_workers(PoolName, CheckFunc, Timeout) ->
{ok, Conn} ->
erlang:is_process_alive(Conn) andalso
ecpool_worker:exec(Worker, CheckFunc, Timeout);
_ ->
false
Error ->
Error
end
end,
try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
[_ | _] = Status ->
lists:all(fun(St) -> St =:= true end, Status);
[] ->
false
catch
exit:timeout ->
false
Results =
try
{ok, emqx_utils:pmap(DoPerWorker, Workers, Timeout)}
catch
exit:timeout ->
{error, timeout}
end,
case ReturnValues of
true ->
Results;
false ->
case Results of
{ok, []} -> false;
{ok, Rs = [_ | _]} -> lists:all(fun(St) -> St =:= true end, Rs);
_ -> false
end
end.
parse_reason({

View File

@ -0,0 +1 @@
Now we return the health check failure reason when probing a MongoDB bridge.

View File

@ -28,7 +28,8 @@ group_tests() ->
t_setup_via_http_api_and_publish,
t_payload_template,
t_collection_template,
t_mongo_date_rule_engine_functions
t_mongo_date_rule_engine_functions,
t_get_status_server_selection_too_short
].
groups() ->
@ -317,6 +318,27 @@ send_message(Config, Payload) ->
BridgeID = emqx_bridge_resource:bridge_id(Type, Name),
emqx_bridge:send_message(BridgeID, Payload).
probe_bridge_api(Config) ->
probe_bridge_api(Config, _Overrides = #{}).
probe_bridge_api(Config, Overrides) ->
Name = ?config(mongo_name, Config),
TypeBin = mongo_type_bin(?config(mongo_type, Config)),
MongoConfig0 = ?config(mongo_config, Config),
MongoConfig = emqx_utils_maps:deep_merge(MongoConfig0, Overrides),
Params = MongoConfig#{<<"type">> => TypeBin, <<"name">> => Name},
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("probing bridge (via http): ~p", [Params]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
Error -> Error
end,
ct:pal("bridge probe result: ~p", [Res]),
Res.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -453,3 +475,21 @@ t_mongo_date_rule_engine_functions(Config) ->
find_all_wait_until_non_empty(Config)
),
ok.
t_get_status_server_selection_too_short(Config) ->
Res = probe_bridge_api(
Config,
#{
<<"topology">> => #{<<"server_selection_timeout_ms">> => <<"1ms">>}
}
),
?assertMatch({error, {{_, 400, _}, _Headers, _Body}}, Res),
{error, {{_, 400, _}, _Headers, Body}} = Res,
?assertMatch(
#{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"timeout">>
},
emqx_utils_json:decode(Body)
),
ok.