diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf index f8009f0a4..4880148f9 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf @@ -86,4 +86,15 @@ emqx_ee_bridge_mongodb { zh: "桥接名称" } } + + payload_template { + desc { + en: "The template for formatting the outgoing messages. If undefined, will send all the available context in JSON format." + zh: "用于格式化外发信息的模板。 如果未定义,将以JSON格式发送所有可用的上下文。" + } + label: { + en: "Payload template" + zh: "有效载荷模板" + } + } } diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index d0099db1c..f47829870 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -57,9 +57,9 @@ resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, u resource_type(kafka) -> emqx_bridge_impl_kafka; resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub; -resource_type(mongodb_rs) -> emqx_connector_mongo; -resource_type(mongodb_sharded) -> emqx_connector_mongo; -resource_type(mongodb_single) -> emqx_connector_mongo; +resource_type(mongodb_rs) -> emqx_ee_connector_mongodb; +resource_type(mongodb_sharded) -> emqx_ee_connector_mongodb; +resource_type(mongodb_single) -> emqx_ee_connector_mongodb; resource_type(mysql) -> emqx_connector_mysql; resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb; diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl index 516c75f65..bb4082681 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl @@ -37,7 +37,8 @@ roots() -> fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})}, - {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})} + {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})}, + {payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})} ]; fields(mongodb_rs) -> emqx_connector_mongo:fields(rs) ++ fields("config"); diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl index fb8f1fcc3..7e44347f3 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl @@ -25,7 +25,8 @@ all() -> group_tests() -> [ t_setup_via_config_and_publish, - t_setup_via_http_api_and_publish + t_setup_via_http_api_and_publish, + t_payload_template ]. groups() -> @@ -196,9 +197,14 @@ parse_and_check(ConfigString, Type, Name) -> Config. create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> Type = mongo_type_bin(?config(mongo_type, Config)), Name = ?config(mongo_name, Config), - MongoConfig = ?config(mongo_config, Config), + MongoConfig0 = ?config(mongo_config, Config), + MongoConfig = emqx_map_lib:deep_merge(MongoConfig0, Overrides), + ct:pal("creating ~p bridge with config:\n ~p", [Type, MongoConfig]), emqx_bridge:create(Type, Name, MongoConfig). delete_bridge(Config) -> @@ -219,7 +225,8 @@ clear_db(Config) -> Name = ?config(mongo_name, Config), #{<<"collection">> := Collection} = ?config(mongo_config, Config), ResourceID = emqx_bridge_resource:resource_id(Type, Name), - {ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID), + {ok, _, #{state := #{connector_state := #{poolname := PoolName}}}} = + emqx_resource:get_instance(ResourceID), Selector = #{}, {true, _} = ecpool:pick_and_do( PoolName, {mongo_api, delete, [Collection, Selector]}, no_handover @@ -275,3 +282,14 @@ t_setup_via_http_api_and_publish(Config) -> find_all(Config) ), ok. + +t_payload_template(Config) -> + {ok, _} = create_bridge(Config, #{<<"payload_template">> => <<"{\"foo\": \"${clientid}\"}">>}), + Val = erlang:unique_integer(), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + ok = send_message(Config, #{key => Val, clientid => ClientId}), + ?assertMatch( + {ok, [#{<<"foo">> := ClientId}]}, + find_all(Config) + ), + ok. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl new file mode 100644 index 000000000..b1327fef6 --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl @@ -0,0 +1,78 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_connector_mongodb). + +-behaviour(emqx_resource). + +-include_lib("emqx_connector/include/emqx_connector_tables.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% `emqx_resource' API +-export([ + callback_mode/0, + is_buffer_supported/0, + on_start/2, + on_stop/2, + on_query/3, + on_get_status/2 +]). + +%%======================================================================================== +%% `emqx_resource' API +%%======================================================================================== + +callback_mode() -> emqx_connector_mongo:callback_mode(). + +is_buffer_supported() -> false. + +on_start(InstanceId, Config) -> + case emqx_connector_mongo:on_start(InstanceId, Config) of + {ok, ConnectorState} -> + PayloadTemplate0 = maps:get(payload_template, Config, undefined), + PayloadTemplate = preprocess_template(PayloadTemplate0), + State = #{ + payload_template => PayloadTemplate, + connector_state => ConnectorState + }, + {ok, State}; + Error -> + Error + end. + +on_stop(InstanceId, _State = #{connector_state := ConnectorState}) -> + emqx_connector_mongo:on_stop(InstanceId, ConnectorState). + +on_query(InstanceId, {send_message, Message0}, State) -> + #{ + payload_template := PayloadTemplate, + connector_state := ConnectorState + } = State, + Message = render_message(PayloadTemplate, Message0), + emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, ConnectorState); +on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) -> + emqx_connector_mongo:on_query(InstanceId, Request, ConnectorState). + +on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) -> + emqx_connector_mongo:on_get_status(InstanceId, ConnectorState). + +%%======================================================================================== +%% Helper fns +%%======================================================================================== + +preprocess_template(undefined = _PayloadTemplate) -> + undefined; +preprocess_template(PayloadTemplate) -> + emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate). + +render_message(undefined = _PayloadTemplate, Message) -> + Message; +render_message(PayloadTemplate, Message) -> + %% Note: mongo expects a map as a document, so the rendered result + %% must be JSON-serializable + Rendered = emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Message), + emqx_json:decode(Rendered, [return_maps]).