%%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_mongodb). -include_lib("emqx_connector/include/emqx_connector.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -behaviour(emqx_resource). -behaviour(hocon_schema). %% callbacks of behaviour emqx_resource -export([ callback_mode/0, on_start/2, on_stop/2, on_query/3, on_get_status/2, namespace/0 ]). %% ecpool callback -export([connect/1]). -export([roots/0, fields/1, desc/1]). -export([mongo_query/5, mongo_insert/3, check_worker_health/1]). %% for testing -export([maybe_resolve_srv_and_txt_records/1]). -define(HEALTH_CHECK_TIMEOUT, 30000). %% mongo servers don't need parse -define(MONGO_HOST_OPTIONS, #{ default_port => ?MONGO_DEFAULT_PORT }). %%===================================================================== namespace() -> "mongo". roots() -> [ {config, #{ type => hoconsc:union( [ hoconsc:ref(?MODULE, single), hoconsc:ref(?MODULE, rs), hoconsc:ref(?MODULE, sharded) ] ) }} ]. fields("connector_rs") -> [ {mongo_type, #{ required => true, type => rs, default => rs, desc => ?DESC("rs_mongo_type") }}, {servers, servers()}, {w_mode, fun w_mode/1}, {r_mode, fun r_mode/1}, {replica_set_name, fun replica_set_name/1} ]; fields("connector_sharded") -> [ {mongo_type, #{ required => true, type => sharded, default => sharded, desc => ?DESC("sharded_mongo_type") }}, {servers, servers()}, {w_mode, fun w_mode/1} ]; fields("connector_single") -> [ {mongo_type, #{ required => true, type => single, default => single, desc => ?DESC("single_mongo_type") }}, {server, server()}, {w_mode, fun w_mode/1} ]; fields(Type) when Type =:= rs; Type =:= single; Type =:= sharded -> fields("connector_" ++ atom_to_list(Type)) ++ fields(mongodb); fields(mongodb) -> [ {srv_record, fun srv_record/1}, {pool_size, fun emqx_connector_schema_lib:pool_size/1}, {username, fun emqx_connector_schema_lib:username/1}, {password, emqx_connector_schema_lib:password_field()}, {use_legacy_protocol, hoconsc:mk(hoconsc:enum([auto, true, false]), #{ default => auto, desc => ?DESC("use_legacy_protocol") })}, {auth_source, #{ type => binary(), required => false, desc => ?DESC("auth_source") }}, {database, fun emqx_connector_schema_lib:database/1}, {topology, #{type => hoconsc:ref(?MODULE, topology), required => false}} ] ++ emqx_connector_schema_lib:ssl_fields(); fields(topology) -> [ {pool_size, hoconsc:mk( pos_integer(), #{ deprecated => {since, "5.1.1"}, importance => ?IMPORTANCE_HIDDEN } )}, {max_overflow, fun max_overflow/1}, {overflow_ttl, duration("overflow_ttl")}, {overflow_check_period, duration("overflow_check_period")}, {local_threshold_ms, duration("local_threshold")}, {connect_timeout_ms, duration("connect_timeout")}, {socket_timeout_ms, duration("socket_timeout")}, {server_selection_timeout_ms, duration("server_selection_timeout")}, {wait_queue_timeout_ms, duration("wait_queue_timeout")}, {heartbeat_frequency_ms, hoconsc:mk( emqx_schema:timeout_duration_ms(), #{ default => <<"200s">>, desc => ?DESC("heartbeat_period") } )}, {min_heartbeat_frequency_ms, duration("min_heartbeat_period")} ]. desc("connector_single") -> ?DESC("desc_single"); desc("connector_rs") -> ?DESC("desc_rs"); desc("connector_sharded") -> ?DESC("desc_sharded"); desc(single) -> ?DESC("desc_single"); desc(rs) -> ?DESC("desc_rs"); desc(sharded) -> ?DESC("desc_sharded"); desc(topology) -> ?DESC("desc_topology"); desc(_) -> undefined. %% =================================================================== callback_mode() -> always_sync. on_start( InstId, Config = #{ mongo_type := Type, pool_size := PoolSize, ssl := SSL } ) -> Msg = case Type of single -> "starting_mongodb_single_connector"; rs -> "starting_mongodb_replica_set_connector"; sharded -> "starting_mongodb_sharded_connector" end, ?SLOG(info, #{msg => Msg, connector => InstId, config => emqx_utils:redact(Config)}), NConfig = #{hosts := Hosts} = maybe_resolve_srv_and_txt_records(Config), SslOpts = case maps:get(enable, SSL) of true -> [ {ssl, true}, {ssl_opts, emqx_tls_lib:to_client_opts(SSL)} ]; false -> [{ssl, false}] end, Topology0 = maps:get(topology, NConfig, #{}), %% we fix this at 1 because we already have ecpool case maps:get(pool_size, Topology0, 1) =:= 1 of true -> ok; false -> ?SLOG( info, #{ msg => "mongodb_overriding_topology_pool_size", connector => InstId, reason => "this option is deprecated; please set `pool_size' for the connector", value => 1 } ) end, Topology = Topology0#{pool_size => 1}, Opts = [ {mongo_type, init_type(NConfig)}, {hosts, Hosts}, {pool_size, PoolSize}, {options, init_topology_options(maps:to_list(Topology), [])}, {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)} ], Collection = maps:get(collection, Config, <<"mqtt">>), case emqx_resource_pool:start(InstId, ?MODULE, Opts) of ok -> {ok, #{ pool_name => InstId, type => Type, collection => Collection }}; {error, Reason} -> {error, Reason} end. on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_mongodb_connector", connector => InstId }), emqx_resource_pool:stop(InstId). on_query( InstId, {_ChannelId, Document}, #{pool_name := PoolName, collection := Collection} = State ) -> Request = {insert, Collection, Document}, ?TRACE( "QUERY", "mongodb_connector_received", #{request => Request, connector => InstId, state => State} ), case ecpool:pick_and_do( PoolName, {?MODULE, mongo_insert, [Collection, Document]}, no_handover ) of {{false, Reason}, _Document} -> ?SLOG(error, #{ msg => "mongodb_connector_do_query_failed", request => Request, reason => Reason, connector => InstId }), {error, Reason}; {error, ecpool_empty} -> {error, {recoverable_error, ecpool_empty}}; {{true, _Info}, _Document} -> ok end; on_query( InstId, {Action, Collection, Filter, Projector}, #{pool_name := PoolName} = State ) -> Request = {Action, Collection, Filter, Projector}, ?TRACE( "QUERY", "mongodb_connector_received", #{request => Request, connector => InstId, state => State} ), case ecpool:pick_and_do( PoolName, {?MODULE, mongo_query, [Action, Collection, Filter, Projector]}, no_handover ) of {error, Reason} -> ?SLOG(error, #{ msg => "mongodb_connector_do_query_failed", request => Request, reason => Reason, connector => InstId }), case Reason of ecpool_empty -> {error, {recoverable_error, Reason}}; _ -> {error, Reason} end; {ok, Cursor} when is_pid(Cursor) -> {ok, mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)}; Result -> {ok, Result} end. on_get_status(InstId, State = #{pool_name := PoolName}) -> case health_check(PoolName) of ok -> ?tp(debug, emqx_connector_mongo_health_check, #{ instance_id => InstId, status => ok }), connected; {error, Reason} -> ?tp(warning, emqx_connector_mongo_health_check, #{ instance_id => InstId, reason => Reason, status => failed }), {disconnected, State, Reason} end. health_check(PoolName) -> Results = emqx_resource_pool:health_check_workers( PoolName, fun ?MODULE:check_worker_health/1, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1), #{return_values => true} ), case Results of {ok, []} -> {error, worker_processes_dead}; {ok, Values} -> case lists:partition(fun(V) -> V =:= ok end, Values) of {_Ok, []} -> ok; {_Ok, [{error, Reason} | _Errors]} -> {error, Reason}; {_Ok, [Error | _Errors]} -> {error, Error} end; {error, Reason} -> {error, Reason} end. %% =================================================================== check_worker_health(Conn) -> %% we don't care if this returns something or not, we just to test the connection try do_test_query(Conn) of {error, Reason} -> ?SLOG(warning, #{ msg => "mongo_connection_get_status_error", reason => Reason }), {error, Reason}; _ -> ok catch Class:Error -> ?SLOG(warning, #{ msg => "mongo_connection_get_status_exception", class => Class, error => Error }), {error, {Class, Error}} end. do_test_query(Conn) -> mongoc:transaction_query( Conn, fun(Conf = #{pool := Worker}) -> Query = mongoc:find_one_query(Conf, <<"foo">>, #{}, #{}, 0), mc_worker_api:find_one(Worker, Query) end, #{}, ?HEALTH_CHECK_TIMEOUT ). connect(Opts) -> Type = proplists:get_value(mongo_type, Opts, single), Hosts = proplists:get_value(hosts, Opts, []), Options = proplists:get_value(options, Opts, []), WorkerOptions = proplists:get_value(worker_options, Opts, []), mongo_api:connect(Type, Hosts, Options, WorkerOptions). mongo_query(Conn, find, Collection, Filter, Projector) -> mongo_api:find(Conn, Collection, Filter, Projector); mongo_query(Conn, find_one, Collection, Filter, Projector) -> mongo_api:find_one(Conn, Collection, Filter, Projector); %% Todo xxx mongo_query(_Conn, _Action, _Collection, _Filter, _Projector) -> ok. mongo_insert(Conn, Collection, Documents) -> mongo_api:insert(Conn, Collection, Documents). init_type(#{mongo_type := rs, replica_set_name := ReplicaSetName}) -> {rs, ReplicaSetName}; init_type(#{mongo_type := Type}) -> Type. init_topology_options([{pool_size, Val} | R], Acc) -> init_topology_options(R, [{pool_size, Val} | Acc]); init_topology_options([{max_overflow, Val} | R], Acc) -> init_topology_options(R, [{max_overflow, Val} | Acc]); init_topology_options([{overflow_ttl, Val} | R], Acc) -> init_topology_options(R, [{overflow_ttl, Val} | Acc]); init_topology_options([{overflow_check_period, Val} | R], Acc) -> init_topology_options(R, [{overflow_check_period, Val} | Acc]); init_topology_options([{local_threshold_ms, Val} | R], Acc) -> init_topology_options(R, [{'localThresholdMS', Val} | Acc]); init_topology_options([{connect_timeout_ms, Val} | R], Acc) -> init_topology_options(R, [{'connectTimeoutMS', Val} | Acc]); init_topology_options([{socket_timeout_ms, Val} | R], Acc) -> init_topology_options(R, [{'socketTimeoutMS', Val} | Acc]); init_topology_options([{server_selection_timeout_ms, Val} | R], Acc) -> init_topology_options(R, [{'serverSelectionTimeoutMS', Val} | Acc]); init_topology_options([{wait_queue_timeout_ms, Val} | R], Acc) -> init_topology_options(R, [{'waitQueueTimeoutMS', Val} | Acc]); init_topology_options([{heartbeat_frequency_ms, Val} | R], Acc) -> init_topology_options(R, [{'heartbeatFrequencyMS', Val} | Acc]); init_topology_options([{min_heartbeat_frequency_ms, Val} | R], Acc) -> init_topology_options(R, [{'minHeartbeatFrequencyMS', Val} | Acc]); init_topology_options([_ | R], Acc) -> init_topology_options(R, Acc); init_topology_options([], Acc) -> Acc. init_worker_options([{database, V} | R], Acc) -> init_worker_options(R, [{database, V} | Acc]); init_worker_options([{auth_source, V} | R], Acc) -> init_worker_options(R, [{auth_source, V} | Acc]); init_worker_options([{username, V} | R], Acc) -> init_worker_options(R, [{login, V} | Acc]); init_worker_options([{password, Secret} | R], Acc) -> init_worker_options(R, [{password, Secret} | Acc]); init_worker_options([{w_mode, V} | R], Acc) -> init_worker_options(R, [{w_mode, V} | Acc]); init_worker_options([{r_mode, V} | R], Acc) -> init_worker_options(R, [{r_mode, V} | Acc]); init_worker_options([{use_legacy_protocol, V} | R], Acc) -> init_worker_options(R, [{use_legacy_protocol, V} | Acc]); init_worker_options([_ | R], Acc) -> init_worker_options(R, Acc); init_worker_options([], Acc) -> Acc. %% =================================================================== %% Schema funcs server() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?MONGO_HOST_OPTIONS). servers() -> Meta = #{desc => ?DESC("servers")}, emqx_schema:servers_sc(Meta, ?MONGO_HOST_OPTIONS). w_mode(type) -> hoconsc:enum([unsafe, safe]); w_mode(desc) -> ?DESC("w_mode"); w_mode(default) -> unsafe; w_mode(_) -> undefined. r_mode(type) -> hoconsc:enum([master, slave_ok]); r_mode(desc) -> ?DESC("r_mode"); r_mode(default) -> master; r_mode(_) -> undefined. duration(Desc) -> #{ type => emqx_schema:timeout_duration_ms(), required => false, desc => ?DESC(Desc) }. max_overflow(type) -> non_neg_integer(); max_overflow(desc) -> ?DESC("max_overflow"); max_overflow(default) -> 0; max_overflow(_) -> undefined. replica_set_name(type) -> binary(); replica_set_name(desc) -> ?DESC("replica_set_name"); replica_set_name(required) -> true; replica_set_name(_) -> undefined. srv_record(type) -> boolean(); srv_record(desc) -> ?DESC("srv_record"); srv_record(default) -> false; srv_record(_) -> undefined. %% =================================================================== %% Internal funcs maybe_resolve_srv_and_txt_records(#{server := Server} = Config) -> NConfig = maps:remove(server, Config), maybe_resolve_srv_and_txt_records1(Server, NConfig); maybe_resolve_srv_and_txt_records(#{servers := Servers} = Config) -> NConfig = maps:remove(servers, Config), maybe_resolve_srv_and_txt_records1(Servers, NConfig). maybe_resolve_srv_and_txt_records1( Servers0, #{ mongo_type := Type, srv_record := false } = Config ) -> case Type =:= rs andalso maps:is_key(replica_set_name, Config) =:= false of true -> throw(#{ reason => "missing_parameter", param => replica_set_name }); false -> Servers = parse_servers(Servers0), Config#{hosts => format_hosts(Servers)} end; maybe_resolve_srv_and_txt_records1( Servers, #{ mongo_type := Type, srv_record := true } = Config ) -> %% when srv is in use, it's typically only one DNS resolution needed, %% however, by the schema definition, it's allowed to configure more than one. %% here we keep only the fist [{DNS, _IgnorePort} | _] = parse_servers(Servers), DnsRecords = resolve_srv_records(DNS), Hosts = format_hosts(DnsRecords), ?tp(info, resolved_srv_records, #{dns => DNS, resolved_hosts => Hosts}), ExtraOpts = resolve_txt_records(Type, DNS), ?tp(info, resolved_txt_records, #{dns => DNS, resolved_options => ExtraOpts}), maps:merge(Config#{hosts => Hosts}, ExtraOpts). resolve_srv_records(DNS0) -> DNS = "_mongodb._tcp." ++ DNS0, DnsData = emqx_connector_lib:resolve_dns(DNS, srv), case [{Host, Port} || {_, _, Port, Host} <- DnsData] of [] -> throw(#{ reason => "failed_to_resolve_srv_record", dns => DNS }); L -> L end. resolve_txt_records(Type, DNS) -> case emqx_connector_lib:resolve_dns(DNS, txt) of [] -> #{}; [[QueryString]] = L -> %% e.g. "authSource=admin&replicaSet=atlas-wrnled-shard-0" case uri_string:dissect_query(QueryString) of {error, _, _} -> throw(#{ reason => "bad_txt_record_resolution", resolved => L }); Options -> convert_options(Type, normalize_options(Options)) end; L -> throw(#{ reason => "multiple_txt_records", resolved => L }) end. normalize_options([]) -> []; normalize_options([{Name, Value} | Options]) -> [{string:lowercase(Name), Value} | normalize_options(Options)]. convert_options(rs, Options) -> M1 = maybe_add_option(auth_source, "authSource", Options), M2 = maybe_add_option(replica_set_name, "replicaSet", Options), maps:merge(M1, M2); convert_options(_, Options) -> maybe_add_option(auth_source, "authSource", Options). maybe_add_option(ConfigKey, OptName0, Options) -> OptName = string:lowercase(OptName0), case lists:keyfind(OptName, 1, Options) of {_, OptValue} -> #{ConfigKey => iolist_to_binary(OptValue)}; false -> #{} end. format_host({Host, Port}) -> iolist_to_binary([Host, ":", integer_to_list(Port)]). format_hosts(Hosts) -> lists:map(fun format_host/1, Hosts). parse_servers(HoconValue) -> lists:map( fun(#{hostname := Host, port := Port}) -> {Host, Port} end, emqx_schema:parse_servers(HoconValue, ?MONGO_HOST_OPTIONS) ).