Merge pull request #11963 from sstrigler/EMQX-11156-bridge-v-2-mongo-db-support

EMQX 11156 bridge v2 mongo db support
This commit is contained in:
Stefan Strigler 2023-11-24 16:49:57 +01:00 committed by GitHub
commit f8f8cf9f30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 810 additions and 198 deletions

View File

@ -77,6 +77,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_confluent_producer_action_info,
emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_syskeeper_action_info
].
-else.
@ -116,14 +117,17 @@ bridge_v1_type_to_action_type(Type) ->
action_type_to_bridge_v1_type(Bin, Conf) when is_binary(Bin) ->
action_type_to_bridge_v1_type(binary_to_existing_atom(Bin), Conf);
action_type_to_bridge_v1_type(ActionType, Conf) ->
action_type_to_bridge_v1_type(ActionType, ActionConf) ->
ActionInfoMap = info_map(),
ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap),
case maps:get(ActionType, ActionTypeToBridgeV1Type, undefined) of
undefined ->
ActionType;
BridgeV1TypeFun when is_function(BridgeV1TypeFun) ->
BridgeV1TypeFun(get_confs(ActionType, Conf));
case get_confs(ActionType, ActionConf) of
{ConnectorConfig, ActionConfig} -> BridgeV1TypeFun({ConnectorConfig, ActionConfig});
undefined -> ActionType
end;
BridgeV1Type ->
BridgeV1Type
end.
@ -131,7 +135,9 @@ action_type_to_bridge_v1_type(ActionType, Conf) ->
get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) ->
ConnectorType = action_type_to_connector_type(ActionType),
ConnectorConfig = emqx_conf:get_raw([connectors, ConnectorType, ConnectorName]),
{ActionConfig, ConnectorConfig}.
{ConnectorConfig, ActionConfig};
get_confs(_, _) ->
undefined.
%% This function should return true for all inputs that are bridge V1 types for
%% bridges that have been refactored to bridge V2s, and for all all bridge V2

View File

@ -237,9 +237,15 @@ send_to_matched_egress_bridges_loop(Topic, Msg, [Id | Ids]) ->
send_to_matched_egress_bridges_loop(Topic, Msg, Ids).
send_message(BridgeId, Message) ->
{BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
send_message(BridgeType, BridgeName, ResId, Message, #{}).
{BridgeV1Type, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
case emqx_bridge_v2:is_bridge_v2_type(BridgeV1Type) of
true ->
BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
emqx_bridge_v2:send_message(BridgeV2Type, BridgeName, Message, #{});
false ->
ResId = emqx_bridge_resource:resource_id(BridgeV1Type, BridgeName),
send_message(BridgeV1Type, BridgeName, ResId, Message, #{})
end.
send_message(BridgeType, BridgeName, ResId, Message, QueryOpts0) ->
case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of
@ -377,8 +383,8 @@ disable_enable(Action, BridgeType0, BridgeName) when
)
end.
create(BridgeType0, BridgeName, RawConf) ->
BridgeType = upgrade_type(BridgeType0),
create(BridgeV1Type, BridgeName, RawConf) ->
BridgeType = upgrade_type(BridgeV1Type),
?SLOG(debug, #{
bridge_action => create,
bridge_type => BridgeType,
@ -387,7 +393,7 @@ create(BridgeType0, BridgeName, RawConf) ->
}),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeType, BridgeName, RawConf);
emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf);
false ->
emqx_conf:update(
emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],

View File

@ -78,6 +78,14 @@ external_ids(Type, Name) ->
[external_id(Type0, Name), external_id(Type, Name)]
end.
get_conf(BridgeType, BridgeName) ->
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_conf:get_raw([actions, BridgeType, BridgeName]);
false ->
undefined
end.
%% Creates the external id for the bridge_v2 that is used by the rule actions
%% to refer to the bridge_v2
external_id(BridgeType, BridgeName) ->
@ -87,9 +95,3 @@ external_id(BridgeType, BridgeName) ->
bin(Bin) when is_binary(Bin) -> Bin;
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
get_conf(BridgeType, BridgeName) ->
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true -> emqx_conf:get_raw([actions, BridgeType, BridgeName]);
false -> emqx_conf:get_raw([bridges, BridgeType, BridgeName])
end.

View File

@ -410,10 +410,10 @@ uninstall_bridge_v2(
CreationOpts = emqx_resource:fetch_creation_opts(Config),
ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts),
ok = emqx_resource:clear_metrics(BridgeV2Id),
case combine_connector_and_bridge_v2_config(BridgeV2Type, BridgeName, Config) of
case validate_referenced_connectors(BridgeV2Type, ConnectorName, BridgeName) of
{error, _} ->
ok;
_CombinedConfig ->
ok ->
%% Deinstall from connector
ConnectorId = emqx_connector_resource:resource_id(
connector_type(BridgeV2Type), ConnectorName
@ -1053,8 +1053,8 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
bridge_v1_type_to_bridge_v2_type(Type) ->
emqx_action_info:bridge_v1_type_to_action_type(Type).
bridge_v2_type_to_bridge_v1_type(Type, Conf) ->
emqx_action_info:action_type_to_bridge_v1_type(Type, Conf).
bridge_v2_type_to_bridge_v1_type(ActionType, ActionConf) ->
emqx_action_info:action_type_to_bridge_v1_type(ActionType, ActionConf).
is_bridge_v2_type(Type) ->
emqx_action_info:is_action_type(Type).
@ -1065,8 +1065,8 @@ bridge_v1_list_and_transform() ->
bridge_v1_lookup_and_transform(ActionType, Name) ->
case lookup(ActionType, Name) of
{ok, #{raw_config := #{<<"connector">> := ConnectorName}} = ActionConfig} ->
BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, ActionConfig),
{ok, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} ->
BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig),
case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of
true ->
ConnectorType = connector_type(ActionType),
@ -1244,6 +1244,8 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousR
#{bin(BridgeV2Type) => #{bin(BridgeName) => PreviousRawConf}},
PreviousRawConf =/= undefined
),
%% [FIXME] this will loop through all connector types, instead pass the
%% connector type and just do it for that one
Output = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(
FakeGlobalConfig
),

View File

@ -552,12 +552,17 @@ t_on_get_status(Config, Opts) ->
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
case ProxyHost of
undefined ->
ok;
_ ->
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
ct:sleep(500),
?retry(
_Interval0 = 200,
_Attempts0 = 10,
?assertEqual({ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId))
_Interval0 = 100,
_Attempts0 = 20,
?assertEqual(
{ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId)
)
)
end),
%% Check that it recovers itself.
@ -565,5 +570,6 @@ t_on_get_status(Config, Opts) ->
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
)
end,
ok.

View File

@ -112,16 +112,15 @@ values({put, connector}) ->
values({put, KafkaType}) ->
maps:merge(values(common_config), values(KafkaType));
values(bridge_v2_producer) ->
maps:merge(
#{
enable => true,
connector => <<"my_kafka_producer_connector">>,
parameters => values(producer_values),
local_topic => <<"mqtt/local/topic">>,
resource_opts => #{
health_check_interval => "32s"
}
},
values(producer)
);
};
values(common_config) ->
#{
authentication => #{
@ -143,7 +142,11 @@ values(common_config) ->
};
values(producer) ->
#{
kafka => #{
kafka => values(producer_values),
local_topic => <<"mqtt/local/topic">>
};
values(producer_values) ->
#{
topic => <<"kafka-topic">>,
message => #{
key => <<"${.clientid}">>,
@ -174,8 +177,6 @@ values(producer) ->
segment_bytes => <<"100MB">>,
memory_overload_protection => true
}
},
local_topic => <<"mqtt/local/topic">>
};
values(consumer) ->
#{

View File

@ -483,11 +483,10 @@ t_failed_creation_then_fix(Config) ->
{ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
% %% TODO: refactor those into init/end per testcase
ok = ?PRODUCER:on_stop(ResourceId, State),
?assertEqual([], supervisor:which_children(wolff_client_sup)),
?assertEqual([], supervisor:which_children(wolff_producers_sup)),
ok = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)),
delete_all_bridges(),
?assertEqual([], supervisor:which_children(wolff_client_sup)),
?assertEqual([], supervisor:which_children(wolff_producers_sup)),
ok.
t_custom_timestamp(_Config) ->

View File

@ -9,7 +9,7 @@
emqx_resource,
emqx_mongodb
]},
{env, []},
{env, [{emqx_action_info_modules, [emqx_bridge_mongodb_action_info]}]},
{modules, []},
{links, []}
]}.

View File

@ -12,7 +12,9 @@
%% emqx_bridge_enterprise "callbacks"
-export([
conn_bridge_examples/1
bridge_v2_examples/1,
conn_bridge_examples/1,
connector_examples/1
]).
%% hocon_schema callbacks
@ -27,10 +29,13 @@
%% hocon_schema API
%%=================================================================================================
%% [TODO] Namespace should be different depending on whether this is used for a
%% connector, an action or a legacy bridge type.
namespace() ->
"bridge_mongodb".
roots() ->
%% ???
[].
fields("config") ->
@ -44,6 +49,18 @@ fields("config") ->
#{required => true, desc => ?DESC(emqx_resource_schema, "creation_opts")}
)}
];
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
[
{parameters,
mk(
hoconsc:union([
ref(emqx_mongodb, "connector_" ++ T)
|| T <- ["single", "sharded", "rs"]
]),
#{required => true, desc => ?DESC("mongodb_parameters")}
)}
] ++ emqx_mongodb:fields(mongodb);
fields("creation_opts") ->
%% so far, mongodb connector does not support batching
%% but we cannot delete this field due to compatibility reasons
@ -55,12 +72,47 @@ fields("creation_opts") ->
desc => ?DESC("batch_size")
}}
]);
fields(action) ->
{mongodb,
mk(
hoconsc:map(name, ref(?MODULE, mongodb_action)),
#{desc => <<"MongoDB Action Config">>, required => false}
)};
fields(mongodb_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
mk(ref(?MODULE, action_parameters), #{
required => true, desc => ?DESC(action_parameters)
})
);
fields(action_parameters) ->
[
{collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
{payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})}
];
fields(resource_opts) ->
fields("creation_opts");
fields(mongodb_rs) ->
emqx_mongodb:fields(rs) ++ fields("config");
fields(mongodb_sharded) ->
emqx_mongodb:fields(sharded) ++ fields("config");
fields(mongodb_single) ->
emqx_mongodb:fields(single) ++ fields("config");
fields("post_connector") ->
type_and_name_fields(mongodb) ++
fields("config_connector");
fields("put_connector") ->
fields("config_connector");
fields("get_connector") ->
emqx_bridge_schema:status_fields() ++
fields("post_connector");
fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++
fields("post_bridge_v2");
fields("post_bridge_v2") ->
type_and_name_fields(mongodb) ++
fields(mongodb_action);
fields("put_bridge_v2") ->
fields(mongodb_action);
fields("post_rs") ->
fields(mongodb_rs) ++ type_and_name_fields(mongodb_rs);
fields("post_sharded") ->
@ -86,6 +138,16 @@ fields("get_single") ->
fields(mongodb_single) ++
type_and_name_fields(mongodb_single).
bridge_v2_examples(Method) ->
[
#{
<<"mongodb">> => #{
summary => <<"MongoDB Action">>,
value => action_values(Method)
}
}
].
conn_bridge_examples(Method) ->
[
#{
@ -108,16 +170,46 @@ conn_bridge_examples(Method) ->
}
].
connector_examples(Method) ->
[
#{
<<"mongodb_rs">> => #{
summary => <<"MongoDB Replica Set Connector">>,
value => connector_values(mongodb_rs, Method)
}
},
#{
<<"mongodb_sharded">> => #{
summary => <<"MongoDB Sharded Connector">>,
value => connector_values(mongodb_sharded, Method)
}
},
#{
<<"mongodb_single">> => #{
summary => <<"MongoDB Standalone Connector">>,
value => connector_values(mongodb_single, Method)
}
}
].
desc("config_connector") ->
?DESC("desc_config");
desc("config") ->
?DESC("desc_config");
desc("creation_opts") ->
?DESC(emqx_resource_schema, "creation_opts");
desc(resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(mongodb_rs) ->
?DESC(mongodb_rs_conf);
desc(mongodb_sharded) ->
?DESC(mongodb_sharded_conf);
desc(mongodb_single) ->
?DESC(mongodb_single_conf);
desc(mongodb_action) ->
?DESC(mongodb_action);
desc(action_parameters) ->
?DESC(action_parameters);
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for MongoDB using `", string:to_upper(Method), "` method."];
desc(_) ->
@ -133,49 +225,102 @@ type_and_name_fields(MongoType) ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
].
values(mongodb_rs = MongoType, Method) ->
TypeOpts = #{
connector_values(Type, Method) ->
lists:foldl(
fun(M1, M2) ->
maps:merge(M1, M2)
end,
#{
description => <<"My example connector">>,
parameters => mongo_type_opts(Type)
},
[
common_values(),
method_values(mongodb, Method)
]
).
action_values(Method) ->
maps:merge(
method_values(mongodb, Method),
#{
description => <<"My example action">>,
enable => true,
connector => <<"my_mongodb_connector">>,
parameters => #{
collection => <<"mycol">>
}
}
).
values(MongoType, Method) ->
maps:merge(
mongo_type_opts(MongoType),
bridge_values(MongoType, Method)
).
mongo_type_opts(mongodb_rs) ->
#{
mongo_type => <<"rs">>,
servers => <<"localhost:27017, localhost:27018">>,
w_mode => <<"safe">>,
r_mode => <<"safe">>,
replica_set_name => <<"rs">>
},
values(common, MongoType, Method, TypeOpts);
values(mongodb_sharded = MongoType, Method) ->
TypeOpts = #{
};
mongo_type_opts(mongodb_sharded) ->
#{
mongo_type => <<"sharded">>,
servers => <<"localhost:27017, localhost:27018">>,
w_mode => <<"safe">>
},
values(common, MongoType, Method, TypeOpts);
values(mongodb_single = MongoType, Method) ->
TypeOpts = #{
};
mongo_type_opts(mongodb_single) ->
#{
mongo_type => <<"single">>,
server => <<"localhost:27017">>,
w_mode => <<"safe">>
},
values(common, MongoType, Method, TypeOpts).
}.
values(common, MongoType, Method, TypeOpts) ->
MongoTypeBin = atom_to_binary(MongoType),
Common = #{
name => <<MongoTypeBin/binary, "_demo">>,
type => MongoTypeBin,
bridge_values(Type, _Method) ->
%% [FIXME] _Method makes a difference since PUT doesn't allow name and type
%% for connectors.
TypeBin = atom_to_binary(Type),
maps:merge(
#{
name => <<TypeBin/binary, "_demo">>,
type => TypeBin,
collection => <<"mycol">>
},
common_values()
).
common_values() ->
#{
enable => true,
collection => <<"mycol">>,
database => <<"mqtt">>,
srv_record => false,
pool_size => 8,
username => <<"myuser">>,
password => <<"******">>
},
MethodVals = method_values(MongoType, Method),
Vals0 = maps:merge(MethodVals, Common),
maps:merge(Vals0, TypeOpts).
}.
method_values(MongoType, _) ->
ConnectorType =
case MongoType of
mongodb_rs -> <<"rs">>;
mongodb_sharded -> <<"sharded">>;
mongodb_single -> <<"single">>
end,
#{mongo_type => ConnectorType}.
method_values(Type, post) ->
TypeBin = atom_to_binary(Type),
#{
name => <<TypeBin/binary, "_demo">>,
type => TypeBin
};
method_values(Type, get) ->
maps:merge(
method_values(Type, post),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
method_values(_Type, put) ->
#{}.

View File

@ -0,0 +1,95 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_mongodb_action_info).
-behaviour(emqx_action_info).
%% behaviour callbacks
-export([
bridge_v1_config_to_action_config/2,
bridge_v1_config_to_connector_config/1,
connector_action_config_to_bridge_v1_config/2,
action_type_name/0,
bridge_v1_type_name/0,
connector_type_name/0,
schema_module/0
]).
%% dynamic callback
-export([
bridge_v1_type_name_fun/1
]).
-import(emqx_utils_conv, [bin/1]).
-define(SCHEMA_MODULE, emqx_bridge_mongodb).
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
fix_v1_type(
maps:merge(
maps:without(
[<<"connector">>],
map_unindent(<<"parameters">>, ActionConfig)
),
map_unindent(<<"parameters">>, ConnectorConfig)
)
).
fix_v1_type(#{<<"mongo_type">> := MongoType} = Conf) ->
Conf#{<<"type">> => v1_type(MongoType)}.
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionTopLevelKeys = schema_keys(mongodb_action),
ActionParametersKeys = schema_keys(action_parameters),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
ActionConfig#{<<"connector">> => ConnectorName}.
bridge_v1_config_to_connector_config(BridgeV1Config) ->
ActionTopLevelKeys = schema_keys(mongodb_action),
ActionParametersKeys = schema_keys(action_parameters),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ConnectorTopLevelKeys = schema_keys("config_connector"),
ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys),
ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys,
make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config).
make_config_map(PickKeys, IndentKeys, Config) ->
Conf0 = maps:with(PickKeys, Config),
map_indent(<<"parameters">>, IndentKeys, Conf0).
bridge_v1_type_name() ->
{fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}.
action_type_name() -> mongodb.
connector_type_name() -> mongodb.
schema_module() -> ?SCHEMA_MODULE.
bridge_v1_type_names() -> [mongodb_rs, mongodb_sharded, mongodb_single].
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"mongo_type">> := MongoType}}, _}) ->
v1_type(MongoType).
v1_type(<<"rs">>) -> mongodb_rs;
v1_type(<<"sharded">>) -> mongodb_sharded;
v1_type(<<"single">>) -> mongodb_single.
map_unindent(Key, Map) ->
maps:merge(
maps:get(Key, Map),
maps:remove(Key, Map)
).
map_indent(IndentKey, PickKeys, Map) ->
maps:put(
IndentKey,
maps:with(PickKeys, Map),
maps:without(PickKeys, Map)
).
schema_keys(Name) ->
[bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].

View File

@ -6,16 +6,19 @@
-behaviour(emqx_resource).
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% `emqx_resource' API
-export([
on_remove_channel/3,
callback_mode/0,
on_start/2,
on_stop/2,
on_add_channel/4,
on_get_channel_status/3,
on_get_channels/1,
on_get_status/2,
on_query/3,
on_get_status/2
on_start/2,
on_stop/2
]).
%%========================================================================================
@ -24,44 +27,94 @@
callback_mode() -> emqx_mongodb:callback_mode().
on_start(InstanceId, Config) ->
on_add_channel(
_InstanceId,
#{channels := Channels} = OldState,
ChannelId,
#{parameters := Parameters} = ChannelConfig0
) ->
PayloadTemplate0 = maps:get(payload_template, Parameters, undefined),
PayloadTemplate = preprocess_template(PayloadTemplate0),
CollectionTemplateSource = maps:get(collection, Parameters),
CollectionTemplate = preprocess_template(CollectionTemplateSource),
ChannelConfig = maps:merge(
Parameters,
ChannelConfig0#{
payload_template => PayloadTemplate,
collection_template => CollectionTemplate
}
),
NewState = OldState#{channels => maps:put(ChannelId, ChannelConfig, Channels)},
{ok, NewState}.
on_get_channel_status(InstanceId, _ChannelId, State) ->
case on_get_status(InstanceId, State) of
connected ->
connected;
_ ->
connecting
end.
on_get_channels(InstanceId) ->
emqx_bridge_v2:get_channels_for_connector(InstanceId).
on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) ->
emqx_mongodb:on_get_status(InstanceId, ConnectorState).
on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_state := ConnectorState}) ->
#{
payload_template := PayloadTemplate,
collection_template := CollectionTemplate
} = ChannelState0 = maps:get(Channel, Channels),
ChannelState = ChannelState0#{
collection => emqx_placeholder:proc_tmpl(CollectionTemplate, Message0)
},
Message = render_message(PayloadTemplate, Message0),
Res = emqx_mongodb:on_query(
InstanceId,
{Channel, Message},
maps:merge(ConnectorState, ChannelState)
),
?tp(mongo_bridge_connector_on_query_return, #{instance_id => InstanceId, result => Res}),
Res;
on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
emqx_mongodb:on_query(InstanceId, Request, ConnectorState).
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
NewState = State#{channels => maps:remove(ChannelId, Channels)},
{ok, NewState}.
on_start(InstanceId, Config0) ->
Config = config_transform(Config0),
case emqx_mongodb:on_start(InstanceId, Config) of
{ok, ConnectorState} ->
PayloadTemplate0 = maps:get(payload_template, Config, undefined),
PayloadTemplate = preprocess_template(PayloadTemplate0),
CollectionTemplateSource = maps:get(collection, Config),
CollectionTemplate = preprocess_template(CollectionTemplateSource),
State = #{
payload_template => PayloadTemplate,
collection_template => CollectionTemplate,
connector_state => ConnectorState
connector_state => ConnectorState,
channels => #{}
},
{ok, State};
Error ->
Error
end.
config_transform(#{parameters := #{mongo_type := MongoType} = Parameters} = Config) ->
maps:put(
type,
connector_type(MongoType),
maps:merge(
maps:remove(parameters, Config),
Parameters
)
).
connector_type(rs) -> mongodb_rs;
connector_type(sharded) -> mongodb_sharded;
connector_type(single) -> mongodb_single.
on_stop(InstanceId, _State = #{connector_state := ConnectorState}) ->
emqx_mongodb:on_stop(InstanceId, ConnectorState).
on_query(InstanceId, {send_message, Message0}, State) ->
#{
payload_template := PayloadTemplate,
collection_template := CollectionTemplate,
connector_state := ConnectorState
} = State,
NewConnectorState = ConnectorState#{
collection => emqx_placeholder:proc_tmpl(CollectionTemplate, Message0)
},
Message = render_message(PayloadTemplate, Message0),
Res = emqx_mongodb:on_query(InstanceId, {send_message, Message}, NewConnectorState),
?tp(mongo_bridge_connector_on_query_return, #{result => Res}),
Res;
on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
emqx_mongodb:on_query(InstanceId, Request, ConnectorState).
on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) ->
emqx_mongodb:on_get_status(InstanceId, ConnectorState).
ok = emqx_mongodb:on_stop(InstanceId, ConnectorState),
?tp(mongodb_stopped, #{instance_id => InstanceId}),
ok.
%%========================================================================================
%% Helper fns

View File

@ -132,7 +132,17 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx_mongodb, emqx_bridge, emqx_rule_engine, emqx_conf]),
ok = emqx_common_test_helpers:stop_apps(
[
emqx_management,
emqx_bridge_mongodb,
emqx_mongodb,
emqx_bridge,
emqx_connector,
emqx_rule_engine,
emqx_conf
]
),
ok.
init_per_testcase(_Testcase, Config) ->
@ -144,6 +154,7 @@ init_per_testcase(_Testcase, Config) ->
end_per_testcase(_Testcase, Config) ->
clear_db(Config),
delete_bridge(Config),
[] = emqx_connector:list(),
snabbkaffe:stop(),
ok.
@ -157,9 +168,17 @@ start_apps() ->
%% we want to make sure they are loaded before
%% ekka start in emqx_common_test_helpers:start_apps/1
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([
emqx_conf, emqx_rule_engine, emqx_bridge, emqx_mongodb
]).
ok = emqx_common_test_helpers:start_apps(
[
emqx_conf,
emqx_rule_engine,
emqx_connector,
emqx_bridge,
emqx_mongodb,
emqx_bridge_mongodb,
emqx_management
]
).
ensure_loaded() ->
_ = application:load(emqtt),
@ -198,6 +217,7 @@ mongo_config(MongoHost, MongoPort0, rs = Type, Config) ->
"\n w_mode = safe"
"\n use_legacy_protocol = auto"
"\n database = mqtt"
"\n mongo_type = rs"
"\n resource_opts = {"
"\n query_mode = ~s"
"\n worker_pool_size = 1"
@ -224,6 +244,7 @@ mongo_config(MongoHost, MongoPort0, sharded = Type, Config) ->
"\n w_mode = safe"
"\n use_legacy_protocol = auto"
"\n database = mqtt"
"\n mongo_type = sharded"
"\n resource_opts = {"
"\n query_mode = ~s"
"\n worker_pool_size = 1"
@ -253,6 +274,7 @@ mongo_config(MongoHost, MongoPort0, single = Type, Config) ->
"\n auth_source = ~s"
"\n username = ~s"
"\n password = \"file://~s\""
"\n mongo_type = single"
"\n resource_opts = {"
"\n query_mode = ~s"
"\n worker_pool_size = 1"
@ -290,13 +312,17 @@ create_bridge(Config, Overrides) ->
delete_bridge(Config) ->
Type = mongo_type_bin(?config(mongo_type, Config)),
Name = ?config(mongo_name, Config),
emqx_bridge:remove(Type, Name).
emqx_bridge:check_deps_and_remove(Type, Name, [connector, rule_actions]).
create_bridge_http(Params) ->
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
{ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
case
emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, #{
return_all => true
})
of
{ok, {{_, 201, _}, _, Body}} -> {ok, emqx_utils_json:decode(Body, [return_maps])};
Error -> Error
end.
@ -564,8 +590,8 @@ t_get_status_server_selection_too_short(Config) ->
ok.
t_use_legacy_protocol_option(Config) ->
ResourceID = resource_id(Config),
{ok, _} = create_bridge(Config, #{<<"use_legacy_protocol">> => <<"true">>}),
ResourceID = resource_id(Config),
?retry(
_Interval0 = 200,
_NAttempts0 = 20,

View File

@ -0,0 +1,232 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_v2_mongodb_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(BRIDGE_TYPE, mongodb).
-define(BRIDGE_TYPE_BIN, <<"mongodb">>).
-define(CONNECTOR_TYPE, mongodb).
-define(CONNECTOR_TYPE_BIN, <<"mongodb">>).
-import(emqx_common_test_helpers, [on_exit/1]).
-import(emqx_utils_conv, [bin/1]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
MongoHost = os:getenv("MONGO_SINGLE_HOST", "mongo"),
MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")),
case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
true ->
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge,
emqx_bridge_mongodb,
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, Api} = emqx_common_test_http:create_default_app(),
[
{apps, Apps},
{api, Api},
{mongo_host, MongoHost},
{mongo_port, MongoPort}
| Config
];
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_mongo);
_ ->
{skip, no_mongo}
end
end.
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config).
common_init_per_testcase(TestCase, Config) ->
ct:timetrap(timer:seconds(60)),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_config:delete_override_conf_files(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
AuthSource = bin(os:getenv("MONGO_AUTHSOURCE", "admin")),
Username = bin(os:getenv("MONGO_USERNAME", "")),
Password = bin(os:getenv("MONGO_PASSWORD", "")),
Passfile = filename:join(?config(priv_dir, Config), "passfile"),
ok = file:write_file(Passfile, Password),
NConfig = [
{mongo_authsource, AuthSource},
{mongo_username, Username},
{mongo_password, Password},
{mongo_passfile, Passfile}
| Config
],
ConnectorConfig = connector_config(Name, NConfig),
BridgeConfig = bridge_config(Name, Name),
ok = snabbkaffe:start_trace(),
[
{connector_type, ?CONNECTOR_TYPE},
{connector_name, Name},
{connector_config, ConnectorConfig},
{bridge_type, ?BRIDGE_TYPE},
{bridge_name, Name},
{bridge_config, BridgeConfig}
| NConfig
].
end_per_testcase(_Testcase, Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(),
ok
end.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
connector_config(Name, Config) ->
MongoHost = ?config(mongo_host, Config),
MongoPort = ?config(mongo_port, Config),
AuthSource = ?config(mongo_authsource, Config),
Username = ?config(mongo_username, Config),
PassFile = ?config(mongo_passfile, Config),
InnerConfigMap0 =
#{
<<"enable">> => true,
<<"database">> => <<"mqtt">>,
<<"parameters">> =>
#{
<<"mongo_type">> => <<"single">>,
<<"server">> => iolist_to_binary([MongoHost, ":", integer_to_binary(MongoPort)]),
<<"w_mode">> => <<"safe">>
},
<<"pool_size">> => 8,
<<"srv_record">> => false,
<<"username">> => Username,
<<"password">> => iolist_to_binary(["file://", PassFile]),
<<"auth_source">> => AuthSource
},
InnerConfigMap = serde_roundtrip(InnerConfigMap0),
parse_and_check_connector_config(InnerConfigMap, Name).
parse_and_check_connector_config(InnerConfigMap, Name) ->
TypeBin = ?CONNECTOR_TYPE_BIN,
RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
#{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{
required => false, atom_key => false
}),
ct:pal("parsed config: ~p", [Config]),
InnerConfigMap.
bridge_config(Name, ConnectorId) ->
InnerConfigMap0 =
#{
<<"enable">> => true,
<<"connector">> => ConnectorId,
<<"parameters">> =>
#{},
<<"local_topic">> => <<"t/aeh">>
%%,
},
InnerConfigMap = serde_roundtrip(InnerConfigMap0),
parse_and_check_bridge_config(InnerConfigMap, Name).
%% check it serializes correctly
serde_roundtrip(InnerConfigMap0) ->
IOList = hocon_pp:do(InnerConfigMap0, #{}),
{ok, InnerConfigMap} = hocon:binary(IOList),
InnerConfigMap.
parse_and_check_bridge_config(InnerConfigMap, Name) ->
TypeBin = ?BRIDGE_TYPE_BIN,
RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}},
hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
InnerConfigMap.
shared_secret_path() ->
os:getenv("CI_SHARED_SECRET_PATH", "/var/lib/secret").
shared_secret(client_keyfile) ->
filename:join([shared_secret_path(), "client.key"]);
shared_secret(client_certfile) ->
filename:join([shared_secret_path(), "client.crt"]);
shared_secret(client_cacertfile) ->
filename:join([shared_secret_path(), "ca.crt"]);
shared_secret(rig_keytab) ->
filename:join([shared_secret_path(), "rig.keytab"]).
make_message() ->
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
#{
clientid => BinTime,
payload => Payload,
timestamp => Time
}.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, mongodb_stopped),
ok.
t_create_via_http(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.
t_on_get_status(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
ok.
t_sync_query(Config) ->
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
fun make_message/0,
fun(Res) -> ?assertEqual(ok, Res) end,
mongo_bridge_connector_on_query_return
),
ok.

View File

@ -20,8 +20,8 @@
resource_type(Type) when is_binary(Type) ->
resource_type(binary_to_atom(Type, utf8));
%% We use AEH's Kafka interface.
resource_type(azure_event_hub_producer) ->
%% We use AEH's Kafka interface.
emqx_bridge_kafka_impl_producer;
resource_type(confluent_producer) ->
emqx_bridge_kafka_impl_producer;
@ -29,6 +29,8 @@ resource_type(gcp_pubsub_producer) ->
emqx_bridge_gcp_pubsub_impl_producer;
resource_type(kafka_producer) ->
emqx_bridge_kafka_impl_producer;
resource_type(mongodb) ->
emqx_bridge_mongodb_connector;
resource_type(syskeeper_forwarder) ->
emqx_bridge_syskeeper_connector;
resource_type(syskeeper_proxy) ->
@ -83,6 +85,14 @@ connector_structs() ->
required => false
}
)},
{mongodb,
mk(
hoconsc:map(name, ref(emqx_bridge_mongodb, "config_connector")),
#{
desc => <<"MongoDB Connector Config">>,
required => false
}
)},
{syskeeper_forwarder,
mk(
hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)),
@ -119,6 +129,7 @@ schema_modules() ->
emqx_bridge_confluent_producer,
emqx_bridge_gcp_pubsub_producer_schema,
emqx_bridge_kafka,
emqx_bridge_mongodb,
emqx_bridge_syskeeper_connector,
emqx_bridge_syskeeper_proxy
].
@ -133,12 +144,13 @@ api_schemas(Method) ->
api_ref(
emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector"
),
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
api_ref(
emqx_bridge_gcp_pubsub_producer_schema,
<<"gcp_pubsub_producer">>,
Method ++ "_connector"
),
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method)
].

View File

@ -68,8 +68,9 @@ enterprise_fields_connectors() -> [].
connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer];
connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub_producer];
connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer];
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder];
connector_type_to_bridge_types(syskeeper_proxy) -> [].
@ -266,8 +267,9 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
RawConfigSoFar1
),
%% Add action
ActionType = emqx_action_info:bridge_v1_type_to_action_type(to_bin(BridgeType)),
RawConfigSoFar3 = emqx_utils_maps:deep_put(
[actions_config_name(), to_bin(maybe_rename(BridgeType)), BridgeName],
[actions_config_name(), to_bin(ActionType), BridgeName],
RawConfigSoFar2,
ActionMap
),
@ -286,12 +288,6 @@ transform_bridges_v1_to_connectors_and_bridges_v2(RawConfig) ->
),
NewRawConf.
%% v1 uses 'kafka' as bridge type v2 uses 'kafka_producer'
maybe_rename(kafka) ->
kafka_producer;
maybe_rename(Name) ->
Name.
%%======================================================================================
%% HOCON Schema Callbacks
%%======================================================================================

View File

@ -68,19 +68,10 @@ roots() ->
}}
].
fields(single) ->
[
{mongo_type, #{
type => single,
default => single,
desc => ?DESC("single_mongo_type")
}},
{server, server()},
{w_mode, fun w_mode/1}
] ++ mongo_fields();
fields(rs) ->
fields("connector_rs") ->
[
{mongo_type, #{
required => true,
type => rs,
default => rs,
desc => ?DESC("rs_mongo_type")
@ -89,17 +80,51 @@ fields(rs) ->
{w_mode, fun w_mode/1},
{r_mode, fun r_mode/1},
{replica_set_name, fun replica_set_name/1}
] ++ mongo_fields();
fields(sharded) ->
];
fields("connector_sharded") ->
[
{mongo_type, #{
required => true,
type => sharded,
default => sharded,
desc => ?DESC("sharded_mongo_type")
}},
{servers, servers()},
{w_mode, fun w_mode/1}
] ++ mongo_fields();
];
fields("connector_single") ->
[
{mongo_type, #{
required => true,
type => single,
default => single,
desc => ?DESC("single_mongo_type")
}},
{server, server()},
{w_mode, fun w_mode/1}
];
fields(Type) when Type =:= rs; Type =:= single; Type =:= sharded ->
fields("connector_" ++ atom_to_list(Type)) ++ fields(mongodb);
fields(mongodb) ->
[
{srv_record, fun srv_record/1},
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{username, fun emqx_connector_schema_lib:username/1},
{password, emqx_connector_schema_lib:password_field()},
{use_legacy_protocol,
hoconsc:mk(hoconsc:enum([auto, true, false]), #{
default => auto,
desc => ?DESC("use_legacy_protocol")
})},
{auth_source, #{
type => binary(),
required => false,
desc => ?DESC("auth_source")
}},
{database, fun emqx_connector_schema_lib:database/1},
{topology, #{type => hoconsc:ref(?MODULE, topology), required => false}}
] ++
emqx_connector_schema_lib:ssl_fields();
fields(topology) ->
[
{pool_size,
@ -129,6 +154,12 @@ fields(topology) ->
{min_heartbeat_frequency_ms, duration("min_heartbeat_period")}
].
desc("connector_single") ->
?DESC("desc_single");
desc("connector_rs") ->
?DESC("desc_rs");
desc("connector_sharded") ->
?DESC("desc_sharded");
desc(single) ->
?DESC("desc_single");
desc(rs) ->
@ -140,27 +171,6 @@ desc(topology) ->
desc(_) ->
undefined.
mongo_fields() ->
[
{srv_record, fun srv_record/1},
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{username, fun emqx_connector_schema_lib:username/1},
{password, emqx_connector_schema_lib:password_field()},
{use_legacy_protocol,
hoconsc:mk(hoconsc:enum([auto, true, false]), #{
default => auto,
desc => ?DESC("use_legacy_protocol")
})},
{auth_source, #{
type => binary(),
required => false,
desc => ?DESC("auth_source")
}},
{database, fun emqx_connector_schema_lib:database/1},
{topology, #{type => hoconsc:ref(?MODULE, topology), required => false}}
] ++
emqx_connector_schema_lib:ssl_fields().
%% ===================================================================
callback_mode() -> always_sync.
@ -236,7 +246,7 @@ on_stop(InstId, _State) ->
on_query(
InstId,
{send_message, Document},
{_ChannelId, Document},
#{pool_name := PoolName, collection := Collection} = State
) ->
Request = {insert, Collection, Document},

View File

@ -48,6 +48,12 @@ mongodb_single_conf.desc:
mongodb_single_conf.label:
"""MongoDB (Standalone) Configuration"""
mongodb_parameters.label:
"""MongoDB Type Specific Parameters"""
mongodb_parameters.desc:
"""Set of parameters specific for the given type of this MongoDB connector, `mongo_type` can be one of `single` (Standalone), `sharded` (Sharded) or `rs` (Replica Set)."""
payload_template.desc:
"""The template for formatting the outgoing messages. If undefined, rule engine will use JSON format to serialize all visible inputs, such as clientid, topic, payload etc."""
@ -59,4 +65,19 @@ batch_size.desc:
batch_size.label:
"""Batch Size"""
action_parameters.label:
"""Action Parameters"""
action_parameters.desc:
"""Additional parameters specific to this action type"""
mongodb_action.label:
"""MongoDB Action"""
mongodb_action.desc:
"""Action to interact with a MongoDB connector"""
mqtt_topic.desc:
"""MQTT topic or topic filter as data source (bridge input). If rule action is used as data source, this config should be left empty, otherwise messages will be duplicated in MongoDB."""
mqtt_topic.label:
"""Source MQTT Topic"""
}