Merge pull request #11429 from thalesmg/mongodb-legacy-opt-20230810
feat(mongodb): add configurable option to override legacy protocol usage
This commit is contained in:
commit
82f27eaf78
|
@ -29,7 +29,8 @@ group_tests() ->
|
|||
t_payload_template,
|
||||
t_collection_template,
|
||||
t_mongo_date_rule_engine_functions,
|
||||
t_get_status_server_selection_too_short
|
||||
t_get_status_server_selection_too_short,
|
||||
t_use_legacy_protocol_option
|
||||
].
|
||||
|
||||
groups() ->
|
||||
|
@ -180,6 +181,7 @@ mongo_config(MongoHost, MongoPort0, rs = Type, Config) ->
|
|||
" replica_set_name = rs0\n"
|
||||
" servers = [~p]\n"
|
||||
" w_mode = safe\n"
|
||||
" use_legacy_protocol = auto\n"
|
||||
" database = mqtt\n"
|
||||
" resource_opts = {\n"
|
||||
" query_mode = ~s\n"
|
||||
|
@ -205,6 +207,7 @@ mongo_config(MongoHost, MongoPort0, sharded = Type, Config) ->
|
|||
" collection = mycol\n"
|
||||
" servers = [~p]\n"
|
||||
" w_mode = safe\n"
|
||||
" use_legacy_protocol = auto\n"
|
||||
" database = mqtt\n"
|
||||
" resource_opts = {\n"
|
||||
" query_mode = ~s\n"
|
||||
|
@ -230,6 +233,7 @@ mongo_config(MongoHost, MongoPort0, single = Type, Config) ->
|
|||
" collection = mycol\n"
|
||||
" server = ~p\n"
|
||||
" w_mode = safe\n"
|
||||
" use_legacy_protocol = auto\n"
|
||||
" database = mqtt\n"
|
||||
" resource_opts = {\n"
|
||||
" query_mode = ~s\n"
|
||||
|
@ -286,10 +290,8 @@ clear_db(Config) ->
|
|||
mongo_api:disconnect(Client).
|
||||
|
||||
find_all(Config) ->
|
||||
Type = mongo_type_bin(?config(mongo_type, Config)),
|
||||
Name = ?config(mongo_name, Config),
|
||||
#{<<"collection">> := Collection} = ?config(mongo_config, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(Type, Name),
|
||||
ResourceID = resource_id(Config),
|
||||
emqx_resource:simple_sync_query(ResourceID, {find, Collection, #{}, #{}}).
|
||||
|
||||
find_all_wait_until_non_empty(Config) ->
|
||||
|
@ -340,6 +342,27 @@ probe_bridge_api(Config, Overrides) ->
|
|||
ct:pal("bridge probe result: ~p", [Res]),
|
||||
Res.
|
||||
|
||||
resource_id(Config) ->
|
||||
Type0 = ?config(mongo_type, Config),
|
||||
Name = ?config(mongo_name, Config),
|
||||
Type = mongo_type_bin(Type0),
|
||||
emqx_bridge_resource:resource_id(Type, Name).
|
||||
|
||||
get_worker_pids(Config) ->
|
||||
ResourceID = resource_id(Config),
|
||||
%% abusing health check api a bit...
|
||||
GetWorkerPid = fun(TopologyPid) ->
|
||||
mongoc:transaction_query(TopologyPid, fun(#{pool := WorkerPid}) -> WorkerPid end)
|
||||
end,
|
||||
{ok, WorkerPids = [_ | _]} =
|
||||
emqx_resource_pool:health_check_workers(
|
||||
ResourceID,
|
||||
GetWorkerPid,
|
||||
5_000,
|
||||
#{return_values => true}
|
||||
),
|
||||
WorkerPids.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -494,3 +517,30 @@ t_get_status_server_selection_too_short(Config) ->
|
|||
emqx_utils_json:decode(Body)
|
||||
),
|
||||
ok.
|
||||
|
||||
t_use_legacy_protocol_option(Config) ->
|
||||
ResourceID = resource_id(Config),
|
||||
{ok, _} = create_bridge(Config, #{<<"use_legacy_protocol">> => <<"true">>}),
|
||||
?retry(
|
||||
_Interval0 = 200,
|
||||
_NAttempts0 = 20,
|
||||
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceID))
|
||||
),
|
||||
WorkerPids0 = get_worker_pids(Config),
|
||||
Expected0 = maps:from_keys(WorkerPids0, true),
|
||||
LegacyOptions0 = maps:from_list([{Pid, mc_utils:use_legacy_protocol(Pid)} || Pid <- WorkerPids0]),
|
||||
?assertEqual(Expected0, LegacyOptions0),
|
||||
{ok, _} = delete_bridge(Config),
|
||||
|
||||
{ok, _} = create_bridge(Config, #{<<"use_legacy_protocol">> => <<"false">>}),
|
||||
?retry(
|
||||
_Interval0 = 200,
|
||||
_NAttempts0 = 20,
|
||||
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceID))
|
||||
),
|
||||
WorkerPids1 = get_worker_pids(Config),
|
||||
Expected1 = maps:from_keys(WorkerPids1, false),
|
||||
LegacyOptions1 = maps:from_list([{Pid, mc_utils:use_legacy_protocol(Pid)} || Pid <- WorkerPids1]),
|
||||
?assertEqual(Expected1, LegacyOptions1),
|
||||
|
||||
ok.
|
||||
|
|
|
@ -3,5 +3,5 @@
|
|||
{erl_opts, [debug_info]}.
|
||||
{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}}
|
||||
, {emqx_resource, {path, "../../apps/emqx_resource"}}
|
||||
, {mongodb, {git, "https://github.com/emqx/mongodb-erlang", {tag, "v3.0.20"}}}
|
||||
, {mongodb, {git, "https://github.com/emqx/mongodb-erlang", {tag, "v3.0.21"}}}
|
||||
]}.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_mongodb, [
|
||||
{description, "EMQX MongoDB Connector"},
|
||||
{vsn, "0.1.1"},
|
||||
{vsn, "0.1.2"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
|
|
|
@ -141,6 +141,11 @@ mongo_fields() ->
|
|||
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
|
||||
{username, fun emqx_connector_schema_lib:username/1},
|
||||
{password, fun emqx_connector_schema_lib:password/1},
|
||||
{use_legacy_protocol,
|
||||
hoconsc:mk(hoconsc:enum([auto, true, false]), #{
|
||||
default => auto,
|
||||
desc => ?DESC("use_legacy_protocol")
|
||||
})},
|
||||
{auth_source, #{
|
||||
type => binary(),
|
||||
required => false,
|
||||
|
@ -429,6 +434,8 @@ init_worker_options([{w_mode, V} | R], Acc) ->
|
|||
init_worker_options(R, [{w_mode, V} | Acc]);
|
||||
init_worker_options([{r_mode, V} | R], Acc) ->
|
||||
init_worker_options(R, [{r_mode, V} | Acc]);
|
||||
init_worker_options([{use_legacy_protocol, V} | R], Acc) ->
|
||||
init_worker_options(R, [{use_legacy_protocol, V} | Acc]);
|
||||
init_worker_options([_ | R], Acc) ->
|
||||
init_worker_options(R, Acc);
|
||||
init_worker_options([], Acc) ->
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Added option to configure detection of legacy protocol in MondoDB connectors and bridges.
|
|
@ -149,4 +149,10 @@ wait_queue_timeout.desc:
|
|||
wait_queue_timeout.label:
|
||||
"""Wait Queue Timeout"""
|
||||
|
||||
use_legacy_protocol.desc:
|
||||
"""Whether to use MongoDB's legacy protocol for communicating with the database. The default is to attempt to automatically determine if the newer protocol is supported."""
|
||||
|
||||
use_legacy_protocol.label:
|
||||
"""Use legacy protocol"""
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue