diff --git a/.github/workflows/elixir_deps_check.yaml b/.github/workflows/elixir_deps_check.yaml index 223e83dff..b8364c6da 100644 --- a/.github/workflows/elixir_deps_check.yaml +++ b/.github/workflows/elixir_deps_check.yaml @@ -36,5 +36,14 @@ jobs: env: MIX_ENV: emqx-enterprise PROFILE: emqx-enterprise + - name: produced lock files + uses: actions/upload-artifact@v3 + if: failure() + with: + name: produced_lock_files + path: | + mix.lock + rebar.lock + retention-days: 1 ... diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 3a79cdf63..368ce18a0 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -269,28 +269,46 @@ on_query( {ok, Result} end. -on_get_status(InstId, #{pool_name := PoolName}) -> +on_get_status(InstId, State = #{pool_name := PoolName}) -> case health_check(PoolName) of - true -> + ok -> ?tp(debug, emqx_connector_mongo_health_check, #{ instance_id => InstId, status => ok }), connected; - false -> + {error, Reason} -> ?tp(warning, emqx_connector_mongo_health_check, #{ instance_id => InstId, + reason => Reason, status => failed }), - disconnected + {disconnected, State, Reason} end. health_check(PoolName) -> - emqx_resource_pool:health_check_workers( - PoolName, - fun ?MODULE:check_worker_health/1, - ?HEALTH_CHECK_TIMEOUT + timer:seconds(1) - ). + Results = + emqx_resource_pool:health_check_workers( + PoolName, + fun ?MODULE:check_worker_health/1, + ?HEALTH_CHECK_TIMEOUT + timer:seconds(1), + #{return_values => true} + ), + case Results of + {ok, []} -> + {error, worker_processes_dead}; + {ok, Values} -> + case lists:partition(fun(V) -> V =:= ok end, Values) of + {_Ok, []} -> + ok; + {_Ok, [{error, Reason} | _Errors]} -> + {error, Reason}; + {_Ok, [Error | _Errors]} -> + {error, Error} + end; + {error, Reason} -> + {error, Reason} + end. %% =================================================================== @@ -302,9 +320,9 @@ check_worker_health(Conn) -> msg => "mongo_connection_get_status_error", reason => Reason }), - false; + {error, Reason}; _ -> - true + ok catch Class:Error -> ?SLOG(warning, #{ @@ -312,7 +330,7 @@ check_worker_health(Conn) -> class => Class, error => Error }), - false + {error, {Class, Error}} end. do_test_query(Conn) -> diff --git a/apps/emqx_resource/src/emqx_resource_pool.erl b/apps/emqx_resource/src/emqx_resource_pool.erl index ea2240efd..3d464d51e 100644 --- a/apps/emqx_resource/src/emqx_resource_pool.erl +++ b/apps/emqx_resource/src/emqx_resource_pool.erl @@ -20,7 +20,8 @@ start/3, stop/1, health_check_workers/2, - health_check_workers/3 + health_check_workers/3, + health_check_workers/4 ]). -include_lib("emqx/include/logger.hrl"). @@ -66,9 +67,13 @@ stop(Name) -> end. health_check_workers(PoolName, CheckFunc) -> - health_check_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT). + health_check_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT, _Opts = #{}). health_check_workers(PoolName, CheckFunc, Timeout) -> + health_check_workers(PoolName, CheckFunc, Timeout, _Opts = #{}). + +health_check_workers(PoolName, CheckFunc, Timeout, Opts) -> + ReturnValues = maps:get(return_values, Opts, false), Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], DoPerWorker = fun(Worker) -> @@ -76,18 +81,26 @@ health_check_workers(PoolName, CheckFunc, Timeout) -> {ok, Conn} -> erlang:is_process_alive(Conn) andalso ecpool_worker:exec(Worker, CheckFunc, Timeout); - _ -> - false + Error -> + Error end end, - try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of - [_ | _] = Status -> - lists:all(fun(St) -> St =:= true end, Status); - [] -> - false - catch - exit:timeout -> - false + Results = + try + {ok, emqx_utils:pmap(DoPerWorker, Workers, Timeout)} + catch + exit:timeout -> + {error, timeout} + end, + case ReturnValues of + true -> + Results; + false -> + case Results of + {ok, []} -> false; + {ok, Rs = [_ | _]} -> lists:all(fun(St) -> St =:= true end, Rs); + _ -> false + end end. parse_reason({ diff --git a/changes/ee/fix-11107.en.md b/changes/ee/fix-11107.en.md new file mode 100644 index 000000000..d03db1178 --- /dev/null +++ b/changes/ee/fix-11107.en.md @@ -0,0 +1 @@ +Now we return the health check failure reason when probing a MongoDB bridge. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl index 72bb0a7dd..e21182591 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl @@ -28,7 +28,8 @@ group_tests() -> t_setup_via_http_api_and_publish, t_payload_template, t_collection_template, - t_mongo_date_rule_engine_functions + t_mongo_date_rule_engine_functions, + t_get_status_server_selection_too_short ]. groups() -> @@ -317,6 +318,27 @@ send_message(Config, Payload) -> BridgeID = emqx_bridge_resource:bridge_id(Type, Name), emqx_bridge:send_message(BridgeID, Payload). +probe_bridge_api(Config) -> + probe_bridge_api(Config, _Overrides = #{}). + +probe_bridge_api(Config, Overrides) -> + Name = ?config(mongo_name, Config), + TypeBin = mongo_type_bin(?config(mongo_type, Config)), + MongoConfig0 = ?config(mongo_config, Config), + MongoConfig = emqx_utils_maps:deep_merge(MongoConfig0, Overrides), + Params = MongoConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("probing bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of + {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0}; + Error -> Error + end, + ct:pal("bridge probe result: ~p", [Res]), + Res. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -453,3 +475,21 @@ t_mongo_date_rule_engine_functions(Config) -> find_all_wait_until_non_empty(Config) ), ok. + +t_get_status_server_selection_too_short(Config) -> + Res = probe_bridge_api( + Config, + #{ + <<"topology">> => #{<<"server_selection_timeout_ms">> => <<"1ms">>} + } + ), + ?assertMatch({error, {{_, 400, _}, _Headers, _Body}}, Res), + {error, {{_, 400, _}, _Headers, Body}} = Res, + ?assertMatch( + #{ + <<"code">> := <<"TEST_FAILED">>, + <<"message">> := <<"timeout">> + }, + emqx_utils_json:decode(Body) + ), + ok.