Merge pull request #10294 from kjellwinblad/kjell/feat/collection_var_syntax_mongodb/EMQX-9246
feat: (MongoDB bridge) use ${var} syntax in MongoDB collection field
This commit is contained in:
commit
58898ea11d
|
@ -0,0 +1 @@
|
||||||
|
When configuring a MongoDB bridge, you can now use the ${var} syntax to reference fields in the message payload within the collection field. This enables you to select the collection to insert data into dynamically.
|
|
@ -0,0 +1 @@
|
||||||
|
在配置 MongoDB 桥时,现在可以使用 ${var} 语法来引用消息负载中的字段,以便动态选择要插入的集合。
|
|
@ -26,7 +26,8 @@ group_tests() ->
|
||||||
[
|
[
|
||||||
t_setup_via_config_and_publish,
|
t_setup_via_config_and_publish,
|
||||||
t_setup_via_http_api_and_publish,
|
t_setup_via_http_api_and_publish,
|
||||||
t_payload_template
|
t_payload_template,
|
||||||
|
t_collection_template
|
||||||
].
|
].
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
|
@ -302,3 +303,24 @@ t_payload_template(Config) ->
|
||||||
find_all(Config)
|
find_all(Config)
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_collection_template(Config) ->
|
||||||
|
{ok, _} = create_bridge(
|
||||||
|
Config,
|
||||||
|
#{
|
||||||
|
<<"payload_template">> => <<"{\"foo\": \"${clientid}\"}">>,
|
||||||
|
<<"collection">> => <<"${mycollectionvar}">>
|
||||||
|
}
|
||||||
|
),
|
||||||
|
Val = erlang:unique_integer(),
|
||||||
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
|
ok = send_message(Config, #{
|
||||||
|
key => Val,
|
||||||
|
clientid => ClientId,
|
||||||
|
mycollectionvar => <<"mycol">>
|
||||||
|
}),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, [#{<<"foo">> := ClientId}]},
|
||||||
|
find_all(Config)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
|
@ -35,8 +35,11 @@ on_start(InstanceId, Config) ->
|
||||||
{ok, ConnectorState} ->
|
{ok, ConnectorState} ->
|
||||||
PayloadTemplate0 = maps:get(payload_template, Config, undefined),
|
PayloadTemplate0 = maps:get(payload_template, Config, undefined),
|
||||||
PayloadTemplate = preprocess_template(PayloadTemplate0),
|
PayloadTemplate = preprocess_template(PayloadTemplate0),
|
||||||
|
CollectionTemplateSource = maps:get(collection, Config),
|
||||||
|
CollectionTemplate = preprocess_template(CollectionTemplateSource),
|
||||||
State = #{
|
State = #{
|
||||||
payload_template => PayloadTemplate,
|
payload_template => PayloadTemplate,
|
||||||
|
collection_template => CollectionTemplate,
|
||||||
connector_state => ConnectorState
|
connector_state => ConnectorState
|
||||||
},
|
},
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
@ -50,10 +53,14 @@ on_stop(InstanceId, _State = #{connector_state := ConnectorState}) ->
|
||||||
on_query(InstanceId, {send_message, Message0}, State) ->
|
on_query(InstanceId, {send_message, Message0}, State) ->
|
||||||
#{
|
#{
|
||||||
payload_template := PayloadTemplate,
|
payload_template := PayloadTemplate,
|
||||||
|
collection_template := CollectionTemplate,
|
||||||
connector_state := ConnectorState
|
connector_state := ConnectorState
|
||||||
} = State,
|
} = State,
|
||||||
|
NewConnectorState = ConnectorState#{
|
||||||
|
collection => emqx_plugin_libs_rule:proc_tmpl(CollectionTemplate, Message0)
|
||||||
|
},
|
||||||
Message = render_message(PayloadTemplate, Message0),
|
Message = render_message(PayloadTemplate, Message0),
|
||||||
emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, ConnectorState);
|
emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState);
|
||||||
on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
|
on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
|
||||||
emqx_connector_mongo:on_query(InstanceId, Request, ConnectorState).
|
emqx_connector_mongo:on_query(InstanceId, Request, ConnectorState).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue