fix(kinesis): replace default payload template

Fixes https://emqx.atlassian.net/browse/EMQX-10766
This commit is contained in:
Paulo Zulato 2023-08-15 14:10:38 -03:00
parent e6fc37013f
commit afffdbbaa1
3 changed files with 44 additions and 3 deletions

View File

@ -83,7 +83,7 @@ fields(producer) ->
sc(
binary(),
#{
default => <<>>,
default => <<"${.}">>,
desc => ?DESC("payload_template")
}
)},

View File

@ -228,13 +228,17 @@ create_bridge_http(Config, KinesisConfigOverrides) ->
Res.
create_bridge(Config) ->
create_bridge(Config, _KinesisConfigOverrides = #{}).
create_bridge(Config, #{}, []).
create_bridge(Config, KinesisConfigOverrides) ->
create_bridge(Config, KinesisConfigOverrides, []).
create_bridge(Config, KinesisConfigOverrides, Removes) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(kinesis_name, Config),
KinesisConfig0 = ?config(kinesis_config, Config),
KinesisConfig = emqx_utils_maps:deep_merge(KinesisConfig0, KinesisConfigOverrides),
KinesisConfig1 = emqx_utils_maps:deep_merge(KinesisConfig0, KinesisConfigOverrides),
KinesisConfig = emqx_utils_maps:deep_remove(Removes, KinesisConfig1),
ct:pal("creating bridge: ~p", [KinesisConfig]),
Res = emqx_bridge:create(TypeBin, Name, KinesisConfig),
ct:pal("bridge creation result: ~p", [Res]),
@ -862,3 +866,39 @@ t_access_denied(Config) ->
end
),
ok.
t_empty_payload_template(Config) ->
ResourceId = ?config(resource_id, Config),
TelemetryTable = ?config(telemetry_table, Config),
Removes = [<<"payload_template">>],
?assertMatch({ok, _}, create_bridge(Config, #{}, Removes)),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
emqx_common_test_helpers:on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
assert_empty_metrics(ResourceId),
ShardIt = get_shard_iterator(Config),
Payload = <<"payload">>,
Message = emqx_message:make(?TOPIC, Payload),
emqx:publish(Message),
%% to avoid test flakiness
wait_telemetry_event(TelemetryTable, success, ResourceId),
wait_until_gauge_is(queuing, 0, 500),
wait_until_gauge_is(inflight, 0, 500),
assert_metrics(
#{
dropped => 0,
failed => 0,
inflight => 0,
matched => 1,
queuing => 0,
retried => 0,
success => 1
},
ResourceId
),
Record = wait_record(Config, ShardIt, 100, 10),
Data = proplists:get_value(<<"Data">>, Record),
?assertMatch(
#{<<"payload">> := <<"payload">>, <<"topic">> := ?TOPIC},
emqx_utils_json:decode(Data, [return_maps])
),
ok.

View File

@ -0,0 +1 @@
The default payload template for Kinesis was updated to store the entire message when no template is provided.