feat(mongodb): add `payload_template` field for bridge (e5.0)
https://emqx.atlassian.net/browse/EMQX-8705 Adds a `payload_template` fields that allows users to customize the payload to publish to MongoDB.
This commit is contained in:
parent
7f5150b2e9
commit
b9f258b737
|
@ -86,4 +86,15 @@ emqx_ee_bridge_mongodb {
|
||||||
zh: "桥接名称"
|
zh: "桥接名称"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
payload_template {
|
||||||
|
desc {
|
||||||
|
en: "The template for formatting the outgoing messages. If undefined, will send all the available context in JSON format."
|
||||||
|
zh: "用于格式化外发信息的模板。 如果未定义,将以JSON格式发送所有可用的上下文。"
|
||||||
|
}
|
||||||
|
label: {
|
||||||
|
en: "Payload template"
|
||||||
|
zh: "有效载荷模板"
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,9 +57,9 @@ resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, u
|
||||||
resource_type(kafka) -> emqx_bridge_impl_kafka;
|
resource_type(kafka) -> emqx_bridge_impl_kafka;
|
||||||
resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
|
resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
|
||||||
resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub;
|
resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub;
|
||||||
resource_type(mongodb_rs) -> emqx_connector_mongo;
|
resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
|
||||||
resource_type(mongodb_sharded) -> emqx_connector_mongo;
|
resource_type(mongodb_sharded) -> emqx_ee_connector_mongodb;
|
||||||
resource_type(mongodb_single) -> emqx_connector_mongo;
|
resource_type(mongodb_single) -> emqx_ee_connector_mongodb;
|
||||||
resource_type(mysql) -> emqx_connector_mysql;
|
resource_type(mysql) -> emqx_connector_mysql;
|
||||||
resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
|
resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
|
||||||
resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb;
|
resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb;
|
||||||
|
|
|
@ -37,7 +37,8 @@ roots() ->
|
||||||
fields("config") ->
|
fields("config") ->
|
||||||
[
|
[
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
|
||||||
{collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})}
|
{collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
|
||||||
|
{payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})}
|
||||||
];
|
];
|
||||||
fields(mongodb_rs) ->
|
fields(mongodb_rs) ->
|
||||||
emqx_connector_mongo:fields(rs) ++ fields("config");
|
emqx_connector_mongo:fields(rs) ++ fields("config");
|
||||||
|
|
|
@ -25,7 +25,8 @@ all() ->
|
||||||
group_tests() ->
|
group_tests() ->
|
||||||
[
|
[
|
||||||
t_setup_via_config_and_publish,
|
t_setup_via_config_and_publish,
|
||||||
t_setup_via_http_api_and_publish
|
t_setup_via_http_api_and_publish,
|
||||||
|
t_payload_template
|
||||||
].
|
].
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
|
@ -196,9 +197,14 @@ parse_and_check(ConfigString, Type, Name) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
create_bridge(Config) ->
|
create_bridge(Config) ->
|
||||||
|
create_bridge(Config, _Overrides = #{}).
|
||||||
|
|
||||||
|
create_bridge(Config, Overrides) ->
|
||||||
Type = mongo_type_bin(?config(mongo_type, Config)),
|
Type = mongo_type_bin(?config(mongo_type, Config)),
|
||||||
Name = ?config(mongo_name, Config),
|
Name = ?config(mongo_name, Config),
|
||||||
MongoConfig = ?config(mongo_config, Config),
|
MongoConfig0 = ?config(mongo_config, Config),
|
||||||
|
MongoConfig = emqx_map_lib:deep_merge(MongoConfig0, Overrides),
|
||||||
|
ct:pal("creating ~p bridge with config:\n ~p", [Type, MongoConfig]),
|
||||||
emqx_bridge:create(Type, Name, MongoConfig).
|
emqx_bridge:create(Type, Name, MongoConfig).
|
||||||
|
|
||||||
delete_bridge(Config) ->
|
delete_bridge(Config) ->
|
||||||
|
@ -219,7 +225,8 @@ clear_db(Config) ->
|
||||||
Name = ?config(mongo_name, Config),
|
Name = ?config(mongo_name, Config),
|
||||||
#{<<"collection">> := Collection} = ?config(mongo_config, Config),
|
#{<<"collection">> := Collection} = ?config(mongo_config, Config),
|
||||||
ResourceID = emqx_bridge_resource:resource_id(Type, Name),
|
ResourceID = emqx_bridge_resource:resource_id(Type, Name),
|
||||||
{ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID),
|
{ok, _, #{state := #{connector_state := #{poolname := PoolName}}}} =
|
||||||
|
emqx_resource:get_instance(ResourceID),
|
||||||
Selector = #{},
|
Selector = #{},
|
||||||
{true, _} = ecpool:pick_and_do(
|
{true, _} = ecpool:pick_and_do(
|
||||||
PoolName, {mongo_api, delete, [Collection, Selector]}, no_handover
|
PoolName, {mongo_api, delete, [Collection, Selector]}, no_handover
|
||||||
|
@ -275,3 +282,14 @@ t_setup_via_http_api_and_publish(Config) ->
|
||||||
find_all(Config)
|
find_all(Config)
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_payload_template(Config) ->
|
||||||
|
{ok, _} = create_bridge(Config, #{<<"payload_template">> => <<"{\"foo\": \"${clientid}\"}">>}),
|
||||||
|
Val = erlang:unique_integer(),
|
||||||
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
|
ok = send_message(Config, #{key => Val, clientid => ClientId}),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, [#{<<"foo">> := ClientId}]},
|
||||||
|
find_all(Config)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_ee_connector_mongodb).
|
||||||
|
|
||||||
|
-behaviour(emqx_resource).
|
||||||
|
|
||||||
|
-include_lib("emqx_connector/include/emqx_connector_tables.hrl").
|
||||||
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
%% `emqx_resource' API
|
||||||
|
-export([
|
||||||
|
callback_mode/0,
|
||||||
|
is_buffer_supported/0,
|
||||||
|
on_start/2,
|
||||||
|
on_stop/2,
|
||||||
|
on_query/3,
|
||||||
|
on_get_status/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
%%========================================================================================
|
||||||
|
%% `emqx_resource' API
|
||||||
|
%%========================================================================================
|
||||||
|
|
||||||
|
callback_mode() -> emqx_connector_mongo:callback_mode().
|
||||||
|
|
||||||
|
is_buffer_supported() -> false.
|
||||||
|
|
||||||
|
on_start(InstanceId, Config) ->
|
||||||
|
case emqx_connector_mongo:on_start(InstanceId, Config) of
|
||||||
|
{ok, ConnectorState} ->
|
||||||
|
PayloadTemplate0 = maps:get(payload_template, Config, undefined),
|
||||||
|
PayloadTemplate = preprocess_template(PayloadTemplate0),
|
||||||
|
State = #{
|
||||||
|
payload_template => PayloadTemplate,
|
||||||
|
connector_state => ConnectorState
|
||||||
|
},
|
||||||
|
{ok, State};
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
on_stop(InstanceId, _State = #{connector_state := ConnectorState}) ->
|
||||||
|
emqx_connector_mongo:on_stop(InstanceId, ConnectorState).
|
||||||
|
|
||||||
|
on_query(InstanceId, {send_message, Message0}, State) ->
|
||||||
|
#{
|
||||||
|
payload_template := PayloadTemplate,
|
||||||
|
connector_state := ConnectorState
|
||||||
|
} = State,
|
||||||
|
Message = render_message(PayloadTemplate, Message0),
|
||||||
|
emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, ConnectorState);
|
||||||
|
on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
|
||||||
|
emqx_connector_mongo:on_query(InstanceId, Request, ConnectorState).
|
||||||
|
|
||||||
|
on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) ->
|
||||||
|
emqx_connector_mongo:on_get_status(InstanceId, ConnectorState).
|
||||||
|
|
||||||
|
%%========================================================================================
|
||||||
|
%% Helper fns
|
||||||
|
%%========================================================================================
|
||||||
|
|
||||||
|
preprocess_template(undefined = _PayloadTemplate) ->
|
||||||
|
undefined;
|
||||||
|
preprocess_template(PayloadTemplate) ->
|
||||||
|
emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate).
|
||||||
|
|
||||||
|
render_message(undefined = _PayloadTemplate, Message) ->
|
||||||
|
Message;
|
||||||
|
render_message(PayloadTemplate, Message) ->
|
||||||
|
%% Note: mongo expects a map as a document, so the rendered result
|
||||||
|
%% must be JSON-serializable
|
||||||
|
Rendered = emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Message),
|
||||||
|
emqx_json:decode(Rendered, [return_maps]).
|
Loading…
Reference in New Issue