feat(mongodb): add configurable option to override legacy protocol usage

Fixes https://emqx.atlassian.net/browse/EMQX-10750

Fixes https://github.com/emqx/emqx/discussions/11428

See https://github.com/emqx/mongodb-erlang/pull/39
This commit is contained in:
Thales Macedo Garitezi 2023-08-10 17:01:18 -03:00
parent 65aee8870b
commit d93e1bbf08
9 changed files with 73 additions and 9 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_authn, [ {application, emqx_authn, [
{description, "EMQX Authentication"}, {description, "EMQX Authentication"},
{vsn, "0.1.24"}, {vsn, "0.1.25"},
{modules, []}, {modules, []},
{registered, [emqx_authn_sup, emqx_authn_registry]}, {registered, [emqx_authn_sup, emqx_authn_registry]},
{applications, [ {applications, [

View File

@ -29,7 +29,8 @@ group_tests() ->
t_payload_template, t_payload_template,
t_collection_template, t_collection_template,
t_mongo_date_rule_engine_functions, t_mongo_date_rule_engine_functions,
t_get_status_server_selection_too_short t_get_status_server_selection_too_short,
t_use_legacy_protocol_option
]. ].
groups() -> groups() ->
@ -180,6 +181,7 @@ mongo_config(MongoHost, MongoPort0, rs = Type, Config) ->
" replica_set_name = rs0\n" " replica_set_name = rs0\n"
" servers = [~p]\n" " servers = [~p]\n"
" w_mode = safe\n" " w_mode = safe\n"
" use_legacy_protocol = auto\n"
" database = mqtt\n" " database = mqtt\n"
" resource_opts = {\n" " resource_opts = {\n"
" query_mode = ~s\n" " query_mode = ~s\n"
@ -205,6 +207,7 @@ mongo_config(MongoHost, MongoPort0, sharded = Type, Config) ->
" collection = mycol\n" " collection = mycol\n"
" servers = [~p]\n" " servers = [~p]\n"
" w_mode = safe\n" " w_mode = safe\n"
" use_legacy_protocol = auto\n"
" database = mqtt\n" " database = mqtt\n"
" resource_opts = {\n" " resource_opts = {\n"
" query_mode = ~s\n" " query_mode = ~s\n"
@ -230,6 +233,7 @@ mongo_config(MongoHost, MongoPort0, single = Type, Config) ->
" collection = mycol\n" " collection = mycol\n"
" server = ~p\n" " server = ~p\n"
" w_mode = safe\n" " w_mode = safe\n"
" use_legacy_protocol = auto\n"
" database = mqtt\n" " database = mqtt\n"
" resource_opts = {\n" " resource_opts = {\n"
" query_mode = ~s\n" " query_mode = ~s\n"
@ -286,10 +290,8 @@ clear_db(Config) ->
mongo_api:disconnect(Client). mongo_api:disconnect(Client).
find_all(Config) -> find_all(Config) ->
Type = mongo_type_bin(?config(mongo_type, Config)),
Name = ?config(mongo_name, Config),
#{<<"collection">> := Collection} = ?config(mongo_config, Config), #{<<"collection">> := Collection} = ?config(mongo_config, Config),
ResourceID = emqx_bridge_resource:resource_id(Type, Name), ResourceID = resource_id(Config),
emqx_resource:simple_sync_query(ResourceID, {find, Collection, #{}, #{}}). emqx_resource:simple_sync_query(ResourceID, {find, Collection, #{}, #{}}).
find_all_wait_until_non_empty(Config) -> find_all_wait_until_non_empty(Config) ->
@ -340,6 +342,27 @@ probe_bridge_api(Config, Overrides) ->
ct:pal("bridge probe result: ~p", [Res]), ct:pal("bridge probe result: ~p", [Res]),
Res. Res.
resource_id(Config) ->
Type0 = ?config(mongo_type, Config),
Name = ?config(mongo_name, Config),
Type = mongo_type_bin(Type0),
emqx_bridge_resource:resource_id(Type, Name).
get_worker_pids(Config) ->
ResourceID = resource_id(Config),
%% abusing health check api a bit...
GetWorkerPid = fun(TopologyPid) ->
mongoc:transaction_query(TopologyPid, fun(#{pool := WorkerPid}) -> WorkerPid end)
end,
{ok, WorkerPids = [_ | _]} =
emqx_resource_pool:health_check_workers(
ResourceID,
GetWorkerPid,
5_000,
#{return_values => true}
),
WorkerPids.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -494,3 +517,30 @@ t_get_status_server_selection_too_short(Config) ->
emqx_utils_json:decode(Body) emqx_utils_json:decode(Body)
), ),
ok. ok.
t_use_legacy_protocol_option(Config) ->
ResourceID = resource_id(Config),
{ok, _} = create_bridge(Config, #{<<"use_legacy_protocol">> => <<"true">>}),
?retry(
_Interval0 = 200,
_NAttempts0 = 20,
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceID))
),
WorkerPids0 = get_worker_pids(Config),
Expected0 = maps:from_keys(WorkerPids0, true),
LegacyOptions0 = maps:from_list([{Pid, mc_utils:use_legacy_protocol(Pid)} || Pid <- WorkerPids0]),
?assertEqual(Expected0, LegacyOptions0),
{ok, _} = delete_bridge(Config),
{ok, _} = create_bridge(Config, #{<<"use_legacy_protocol">> => <<"false">>}),
?retry(
_Interval0 = 200,
_NAttempts0 = 20,
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceID))
),
WorkerPids1 = get_worker_pids(Config),
Expected1 = maps:from_keys(WorkerPids1, false),
LegacyOptions1 = maps:from_list([{Pid, mc_utils:use_legacy_protocol(Pid)} || Pid <- WorkerPids1]),
?assertEqual(Expected1, LegacyOptions1),
ok.

View File

@ -3,7 +3,7 @@
{id, "emqx_machine"}, {id, "emqx_machine"},
{description, "The EMQX Machine"}, {description, "The EMQX Machine"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "0.2.11"}, {vsn, "0.2.12"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_ctl]}, {applications, [kernel, stdlib, emqx_ctl]},

View File

@ -3,5 +3,5 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}} {deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_resource, {path, "../../apps/emqx_resource"}}
, {mongodb, {git, "https://github.com/emqx/mongodb-erlang", {tag, "v3.0.20"}}} , {mongodb, {git, "https://github.com/emqx/mongodb-erlang", {tag, "v3.0.21"}}}
]}. ]}.

View File

@ -1,6 +1,6 @@
{application, emqx_mongodb, [ {application, emqx_mongodb, [
{description, "EMQX MongoDB Connector"}, {description, "EMQX MongoDB Connector"},
{vsn, "0.1.1"}, {vsn, "0.1.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -141,6 +141,11 @@ mongo_fields() ->
{pool_size, fun emqx_connector_schema_lib:pool_size/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1},
{username, fun emqx_connector_schema_lib:username/1}, {username, fun emqx_connector_schema_lib:username/1},
{password, fun emqx_connector_schema_lib:password/1}, {password, fun emqx_connector_schema_lib:password/1},
{use_legacy_protocol,
hoconsc:mk(hoconsc:enum([auto, true, false]), #{
default => auto,
desc => ?DESC("use_legacy_protocol")
})},
{auth_source, #{ {auth_source, #{
type => binary(), type => binary(),
required => false, required => false,
@ -429,6 +434,8 @@ init_worker_options([{w_mode, V} | R], Acc) ->
init_worker_options(R, [{w_mode, V} | Acc]); init_worker_options(R, [{w_mode, V} | Acc]);
init_worker_options([{r_mode, V} | R], Acc) -> init_worker_options([{r_mode, V} | R], Acc) ->
init_worker_options(R, [{r_mode, V} | Acc]); init_worker_options(R, [{r_mode, V} | Acc]);
init_worker_options([{use_legacy_protocol, V} | R], Acc) ->
init_worker_options(R, [{use_legacy_protocol, V} | Acc]);
init_worker_options([_ | R], Acc) -> init_worker_options([_ | R], Acc) ->
init_worker_options(R, Acc); init_worker_options(R, Acc);
init_worker_options([], Acc) -> init_worker_options([], Acc) ->

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [ {application, emqx_retainer, [
{description, "EMQX Retainer"}, {description, "EMQX Retainer"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.16"}, {vsn, "5.0.17"},
{modules, []}, {modules, []},
{registered, [emqx_retainer_sup]}, {registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx, emqx_ctl]}, {applications, [kernel, stdlib, emqx, emqx_ctl]},

View File

@ -0,0 +1 @@
Added option to configure detection of legacy protocol in MondoDB connectors and bridges.

View File

@ -149,4 +149,10 @@ wait_queue_timeout.desc:
wait_queue_timeout.label: wait_queue_timeout.label:
"""Wait Queue Timeout""" """Wait Queue Timeout"""
use_legacy_protocol.desc:
"""Whether to use MongoDB's legacy protocol for communicating with the database. The default is to attempt to automatically determine if the newer protocol is supported."""
use_legacy_protocol.label:
"""Use legacy protocol"""
} }