emqx/apps/emqx_mongodb/src/emqx_mongodb.erl

616 lines
19 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mongodb).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(emqx_resource).
-behaviour(hocon_schema).
%% callbacks of behaviour emqx_resource
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_query/3,
on_get_status/2,
namespace/0
]).
%% ecpool callback
-export([connect/1]).
-export([roots/0, fields/1, desc/1]).
-export([mongo_query/5, mongo_insert/3, check_worker_health/1]).
%% for testing
-export([maybe_resolve_srv_and_txt_records/1]).
-define(HEALTH_CHECK_TIMEOUT, 30000).
%% mongo servers don't need parse
-define(MONGO_HOST_OPTIONS, #{
default_port => ?MONGO_DEFAULT_PORT
}).
%%=====================================================================
namespace() -> "mongo".
roots() ->
[
{config, #{
type => hoconsc:union(
[
hoconsc:ref(?MODULE, single),
hoconsc:ref(?MODULE, rs),
hoconsc:ref(?MODULE, sharded)
]
)
}}
].
fields("connector_rs") ->
[
{mongo_type, #{
required => true,
type => rs,
default => rs,
desc => ?DESC("rs_mongo_type")
}},
{servers, servers()},
{w_mode, fun w_mode/1},
{r_mode, fun r_mode/1},
{replica_set_name, fun replica_set_name/1}
];
fields("connector_sharded") ->
[
{mongo_type, #{
required => true,
type => sharded,
default => sharded,
desc => ?DESC("sharded_mongo_type")
}},
{servers, servers()},
{w_mode, fun w_mode/1}
];
fields("connector_single") ->
[
{mongo_type, #{
required => true,
type => single,
default => single,
desc => ?DESC("single_mongo_type")
}},
{server, server()},
{w_mode, fun w_mode/1}
];
fields(Type) when Type =:= rs; Type =:= single; Type =:= sharded ->
fields("connector_" ++ atom_to_list(Type)) ++ fields(mongodb);
fields(mongodb) ->
[
{srv_record, fun srv_record/1},
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{username, fun emqx_connector_schema_lib:username/1},
{password, emqx_connector_schema_lib:password_field()},
{use_legacy_protocol,
hoconsc:mk(hoconsc:enum([auto, true, false]), #{
default => auto,
desc => ?DESC("use_legacy_protocol")
})},
{auth_source, #{
type => binary(),
required => false,
desc => ?DESC("auth_source")
}},
{database, fun emqx_connector_schema_lib:database/1},
{topology, #{type => hoconsc:ref(?MODULE, topology), required => false}}
] ++
emqx_connector_schema_lib:ssl_fields();
fields(topology) ->
[
{pool_size,
hoconsc:mk(
pos_integer(),
#{
deprecated => {since, "5.1.1"},
importance => ?IMPORTANCE_HIDDEN
}
)},
{max_overflow, fun max_overflow/1},
{overflow_ttl, duration("overflow_ttl")},
{overflow_check_period, duration("overflow_check_period")},
{local_threshold_ms, duration("local_threshold")},
{connect_timeout_ms, duration("connect_timeout")},
{socket_timeout_ms, duration("socket_timeout")},
{server_selection_timeout_ms, duration("server_selection_timeout")},
{wait_queue_timeout_ms, duration("wait_queue_timeout")},
{heartbeat_frequency_ms,
hoconsc:mk(
emqx_schema:timeout_duration_ms(),
#{
default => <<"200s">>,
desc => ?DESC("heartbeat_period")
}
)},
{min_heartbeat_frequency_ms, duration("min_heartbeat_period")}
].
desc("connector_single") ->
?DESC("desc_single");
desc("connector_rs") ->
?DESC("desc_rs");
desc("connector_sharded") ->
?DESC("desc_sharded");
desc(single) ->
?DESC("desc_single");
desc(rs) ->
?DESC("desc_rs");
desc(sharded) ->
?DESC("desc_sharded");
desc(topology) ->
?DESC("desc_topology");
desc(_) ->
undefined.
%% ===================================================================
callback_mode() -> always_sync.
on_start(
InstId,
Config = #{
mongo_type := Type,
pool_size := PoolSize,
ssl := SSL
}
) ->
Msg =
case Type of
single -> "starting_mongodb_single_connector";
rs -> "starting_mongodb_replica_set_connector";
sharded -> "starting_mongodb_sharded_connector"
end,
?SLOG(info, #{msg => Msg, connector => InstId, config => emqx_utils:redact(Config)}),
NConfig = #{hosts := Hosts} = maybe_resolve_srv_and_txt_records(Config),
SslOpts =
case maps:get(enable, SSL) of
true ->
[
{ssl, true},
{ssl_opts, emqx_tls_lib:to_client_opts(SSL)}
];
false ->
[{ssl, false}]
end,
Topology0 = maps:get(topology, NConfig, #{}),
%% we fix this at 1 because we already have ecpool
case maps:get(pool_size, Topology0, 1) =:= 1 of
true ->
ok;
false ->
?SLOG(
info,
#{
msg => "mongodb_overriding_topology_pool_size",
connector => InstId,
reason => "this option is deprecated; please set `pool_size' for the connector",
value => 1
}
)
end,
Topology = Topology0#{pool_size => 1},
Opts = [
{mongo_type, init_type(NConfig)},
{hosts, Hosts},
{pool_size, PoolSize},
{options, init_topology_options(maps:to_list(Topology), [])},
{worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}
],
Collection = maps:get(collection, Config, <<"mqtt">>),
case emqx_resource_pool:start(InstId, ?MODULE, Opts) of
ok ->
{ok, #{
pool_name => InstId,
type => Type,
collection => Collection
}};
{error, Reason} ->
{error, Reason}
end.
on_stop(InstId, _State) ->
?SLOG(info, #{
msg => "stopping_mongodb_connector",
connector => InstId
}),
emqx_resource_pool:stop(InstId).
on_query(
InstId,
{_ChannelId, Document},
#{pool_name := PoolName, collection := Collection} = State
) ->
Request = {insert, Collection, Document},
?TRACE(
"QUERY",
"mongodb_connector_received",
#{request => Request, connector => InstId, state => State}
),
case
ecpool:pick_and_do(
PoolName,
{?MODULE, mongo_insert, [Collection, Document]},
no_handover
)
of
{{false, Reason}, _Document} ->
?SLOG(error, #{
msg => "mongodb_connector_do_query_failed",
request => Request,
reason => Reason,
connector => InstId
}),
{error, Reason};
{error, ecpool_empty} ->
{error, {recoverable_error, ecpool_empty}};
{{true, _Info}, _Document} ->
ok
end;
on_query(
InstId,
{Action, Collection, Filter, Projector},
#{pool_name := PoolName} = State
) ->
Request = {Action, Collection, Filter, Projector},
?TRACE(
"QUERY",
"mongodb_connector_received",
#{request => Request, connector => InstId, state => State}
),
case
ecpool:pick_and_do(
PoolName,
{?MODULE, mongo_query, [Action, Collection, Filter, Projector]},
no_handover
)
of
{error, Reason} ->
?SLOG(error, #{
msg => "mongodb_connector_do_query_failed",
request => Request,
reason => Reason,
connector => InstId
}),
case Reason of
ecpool_empty ->
{error, {recoverable_error, Reason}};
_ ->
{error, Reason}
end;
{ok, Cursor} when is_pid(Cursor) ->
{ok, mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)};
Result ->
{ok, Result}
end.
on_get_status(InstId, State = #{pool_name := PoolName}) ->
case health_check(PoolName) of
ok ->
?tp(debug, emqx_connector_mongo_health_check, #{
instance_id => InstId,
status => ok
}),
connected;
{error, Reason} ->
?tp(warning, emqx_connector_mongo_health_check, #{
instance_id => InstId,
reason => Reason,
status => failed
}),
{disconnected, State, Reason}
end.
health_check(PoolName) ->
Results =
emqx_resource_pool:health_check_workers(
PoolName,
fun ?MODULE:check_worker_health/1,
?HEALTH_CHECK_TIMEOUT + timer:seconds(1),
#{return_values => true}
),
case Results of
{ok, []} ->
{error, worker_processes_dead};
{ok, Values} ->
case lists:partition(fun(V) -> V =:= ok end, Values) of
{_Ok, []} ->
ok;
{_Ok, [{error, Reason} | _Errors]} ->
{error, Reason};
{_Ok, [Error | _Errors]} ->
{error, Error}
end;
{error, Reason} ->
{error, Reason}
end.
%% ===================================================================
check_worker_health(Conn) ->
%% we don't care if this returns something or not, we just to test the connection
try do_test_query(Conn) of
{error, Reason} ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_error",
reason => Reason
}),
{error, Reason};
_ ->
ok
catch
Class:Error ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_exception",
class => Class,
error => Error
}),
{error, {Class, Error}}
end.
do_test_query(Conn) ->
mongoc:transaction_query(
Conn,
fun(Conf = #{pool := Worker}) ->
Query = mongoc:find_one_query(Conf, <<"foo">>, #{}, #{}, 0),
mc_worker_api:find_one(Worker, Query)
end,
#{},
?HEALTH_CHECK_TIMEOUT
).
connect(Opts) ->
Type = proplists:get_value(mongo_type, Opts, single),
Hosts = proplists:get_value(hosts, Opts, []),
Options = proplists:get_value(options, Opts, []),
WorkerOptions = proplists:get_value(worker_options, Opts, []),
mongo_api:connect(Type, Hosts, Options, WorkerOptions).
mongo_query(Conn, find, Collection, Filter, Projector) ->
mongo_api:find(Conn, Collection, Filter, Projector);
mongo_query(Conn, find_one, Collection, Filter, Projector) ->
mongo_api:find_one(Conn, Collection, Filter, Projector);
%% Todo xxx
mongo_query(_Conn, _Action, _Collection, _Filter, _Projector) ->
ok.
mongo_insert(Conn, Collection, Documents) ->
mongo_api:insert(Conn, Collection, Documents).
init_type(#{mongo_type := rs, replica_set_name := ReplicaSetName}) ->
{rs, ReplicaSetName};
init_type(#{mongo_type := Type}) ->
Type.
init_topology_options([{pool_size, Val} | R], Acc) ->
init_topology_options(R, [{pool_size, Val} | Acc]);
init_topology_options([{max_overflow, Val} | R], Acc) ->
init_topology_options(R, [{max_overflow, Val} | Acc]);
init_topology_options([{overflow_ttl, Val} | R], Acc) ->
init_topology_options(R, [{overflow_ttl, Val} | Acc]);
init_topology_options([{overflow_check_period, Val} | R], Acc) ->
init_topology_options(R, [{overflow_check_period, Val} | Acc]);
init_topology_options([{local_threshold_ms, Val} | R], Acc) ->
init_topology_options(R, [{'localThresholdMS', Val} | Acc]);
init_topology_options([{connect_timeout_ms, Val} | R], Acc) ->
init_topology_options(R, [{'connectTimeoutMS', Val} | Acc]);
init_topology_options([{socket_timeout_ms, Val} | R], Acc) ->
init_topology_options(R, [{'socketTimeoutMS', Val} | Acc]);
init_topology_options([{server_selection_timeout_ms, Val} | R], Acc) ->
init_topology_options(R, [{'serverSelectionTimeoutMS', Val} | Acc]);
init_topology_options([{wait_queue_timeout_ms, Val} | R], Acc) ->
init_topology_options(R, [{'waitQueueTimeoutMS', Val} | Acc]);
init_topology_options([{heartbeat_frequency_ms, Val} | R], Acc) ->
init_topology_options(R, [{'heartbeatFrequencyMS', Val} | Acc]);
init_topology_options([{min_heartbeat_frequency_ms, Val} | R], Acc) ->
init_topology_options(R, [{'minHeartbeatFrequencyMS', Val} | Acc]);
init_topology_options([_ | R], Acc) ->
init_topology_options(R, Acc);
init_topology_options([], Acc) ->
Acc.
init_worker_options([{database, V} | R], Acc) ->
init_worker_options(R, [{database, V} | Acc]);
init_worker_options([{auth_source, V} | R], Acc) ->
init_worker_options(R, [{auth_source, V} | Acc]);
init_worker_options([{username, V} | R], Acc) ->
init_worker_options(R, [{login, V} | Acc]);
init_worker_options([{password, Secret} | R], Acc) ->
init_worker_options(R, [{password, Secret} | Acc]);
init_worker_options([{w_mode, V} | R], Acc) ->
init_worker_options(R, [{w_mode, V} | Acc]);
init_worker_options([{r_mode, V} | R], Acc) ->
init_worker_options(R, [{r_mode, V} | Acc]);
init_worker_options([{use_legacy_protocol, V} | R], Acc) ->
init_worker_options(R, [{use_legacy_protocol, V} | Acc]);
init_worker_options([_ | R], Acc) ->
init_worker_options(R, Acc);
init_worker_options([], Acc) ->
Acc.
%% ===================================================================
%% Schema funcs
server() ->
Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?MONGO_HOST_OPTIONS).
servers() ->
Meta = #{desc => ?DESC("servers")},
emqx_schema:servers_sc(Meta, ?MONGO_HOST_OPTIONS).
w_mode(type) -> hoconsc:enum([unsafe, safe]);
w_mode(desc) -> ?DESC("w_mode");
w_mode(default) -> unsafe;
w_mode(_) -> undefined.
r_mode(type) -> hoconsc:enum([master, slave_ok]);
r_mode(desc) -> ?DESC("r_mode");
r_mode(default) -> master;
r_mode(_) -> undefined.
duration(Desc) ->
#{
type => emqx_schema:timeout_duration_ms(),
required => false,
desc => ?DESC(Desc)
}.
max_overflow(type) -> non_neg_integer();
max_overflow(desc) -> ?DESC("max_overflow");
max_overflow(default) -> 0;
max_overflow(_) -> undefined.
replica_set_name(type) -> binary();
replica_set_name(desc) -> ?DESC("replica_set_name");
replica_set_name(required) -> true;
replica_set_name(_) -> undefined.
srv_record(type) -> boolean();
srv_record(desc) -> ?DESC("srv_record");
srv_record(default) -> false;
srv_record(_) -> undefined.
%% ===================================================================
%% Internal funcs
maybe_resolve_srv_and_txt_records(#{server := Server} = Config) ->
NConfig = maps:remove(server, Config),
maybe_resolve_srv_and_txt_records1(Server, NConfig);
maybe_resolve_srv_and_txt_records(#{servers := Servers} = Config) ->
NConfig = maps:remove(servers, Config),
maybe_resolve_srv_and_txt_records1(Servers, NConfig).
maybe_resolve_srv_and_txt_records1(
Servers0,
#{
mongo_type := Type,
srv_record := false
} = Config
) ->
case Type =:= rs andalso maps:is_key(replica_set_name, Config) =:= false of
true ->
throw(#{
reason => "missing_parameter",
param => replica_set_name
});
false ->
Servers = parse_servers(Servers0),
Config#{hosts => format_hosts(Servers)}
end;
maybe_resolve_srv_and_txt_records1(
Servers,
#{
mongo_type := Type,
srv_record := true
} = Config
) ->
%% when srv is in use, it's typically only one DNS resolution needed,
%% however, by the schema definition, it's allowed to configure more than one.
%% here we keep only the fist
[{DNS, _IgnorePort} | _] = parse_servers(Servers),
DnsRecords = resolve_srv_records(DNS),
Hosts = format_hosts(DnsRecords),
?tp(info, resolved_srv_records, #{dns => DNS, resolved_hosts => Hosts}),
ExtraOpts = resolve_txt_records(Type, DNS),
?tp(info, resolved_txt_records, #{dns => DNS, resolved_options => ExtraOpts}),
maps:merge(Config#{hosts => Hosts}, ExtraOpts).
resolve_srv_records(DNS0) ->
DNS = "_mongodb._tcp." ++ DNS0,
DnsData = emqx_connector_lib:resolve_dns(DNS, srv),
case [{Host, Port} || {_, _, Port, Host} <- DnsData] of
[] ->
throw(#{
reason => "failed_to_resolve_srv_record",
dns => DNS
});
L ->
L
end.
resolve_txt_records(Type, DNS) ->
case emqx_connector_lib:resolve_dns(DNS, txt) of
[] ->
#{};
[[QueryString]] = L ->
%% e.g. "authSource=admin&replicaSet=atlas-wrnled-shard-0"
case uri_string:dissect_query(QueryString) of
{error, _, _} ->
throw(#{
reason => "bad_txt_record_resolution",
resolved => L
});
Options ->
convert_options(Type, normalize_options(Options))
end;
L ->
throw(#{
reason => "multiple_txt_records",
resolved => L
})
end.
normalize_options([]) ->
[];
normalize_options([{Name, Value} | Options]) ->
[{string:lowercase(Name), Value} | normalize_options(Options)].
convert_options(rs, Options) ->
M1 = maybe_add_option(auth_source, "authSource", Options),
M2 = maybe_add_option(replica_set_name, "replicaSet", Options),
maps:merge(M1, M2);
convert_options(_, Options) ->
maybe_add_option(auth_source, "authSource", Options).
maybe_add_option(ConfigKey, OptName0, Options) ->
OptName = string:lowercase(OptName0),
case lists:keyfind(OptName, 1, Options) of
{_, OptValue} ->
#{ConfigKey => iolist_to_binary(OptValue)};
false ->
#{}
end.
format_host({Host, Port}) ->
iolist_to_binary([Host, ":", integer_to_list(Port)]).
format_hosts(Hosts) ->
lists:map(fun format_host/1, Hosts).
parse_servers(HoconValue) ->
lists:map(
fun(#{hostname := Host, port := Port}) ->
{Host, Port}
end,
emqx_schema:parse_servers(HoconValue, ?MONGO_HOST_OPTIONS)
).