fix(s3-aggreg): ensure action works in Rule SQL contexts

This commit is contained in:
Andrew Mayorov 2024-04-29 10:41:25 +02:00
parent a1a313d992
commit 83366cbed0
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 56 additions and 6 deletions

View File

@ -57,7 +57,7 @@ close(#csv{}) ->
%%
mk_columns(Record, #csv{order = ColumnOrder}) ->
Columns = lists:sort(maps:keys(Record)),
Columns = [emqx_utils_conv:bin(C) || C <- lists:sort(maps:keys(Record))],
Unoredered = Columns -- ColumnOrder,
ColumnOrder ++ Unoredered.
@ -81,11 +81,11 @@ emit_row(#{}, [], #csv{delimiter = Delim}) ->
[Delim].
emit_cell(Column, Record, CSV) ->
case maps:get(Column, Record, undefined) of
undefined ->
_Empty = "";
Value ->
encode_cell(emqx_template:to_string(Value), CSV)
case emqx_template:lookup(Column, Record) of
{ok, Value} ->
encode_cell(emqx_template:to_string(Value), CSV);
{error, undefined} ->
_Empty = ""
end.
encode_cell(V, #csv{quoting_mp = MP}) ->

View File

@ -193,6 +193,56 @@ t_aggreg_upload(Config) ->
erl_csv:decode(Content)
).
t_aggreg_upload_rule(Config) ->
Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config),
ClientID = emqx_utils_conv:bin(?FUNCTION_NAME),
%% Create a bridge with the sample configuration and a simple SQL rule.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
?assertMatch(
{ok, _Rule},
emqx_bridge_v2_testlib:create_rule_and_action_http(?BRIDGE_TYPE, <<>>, Config, #{
sql => <<
"SELECT"
" *,"
" strlen(payload) as psize,"
" unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at"
" FROM 's3/#'"
>>
})
),
ok = lists:foreach(fun emqx:publish/1, [
emqx_message:make(?FUNCTION_NAME, T1 = <<"s3/m1">>, P1 = <<"[HELLO]">>),
emqx_message:make(?FUNCTION_NAME, T2 = <<"s3/m2">>, P2 = <<"[WORLD]">>),
emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>),
emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>)
]),
?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}),
%% Check the uploaded objects.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
_CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key),
%% Verify that column order is respected and event fields are preserved.
?assertMatch(?CONF_COLUMN_ORDER(_), Header),
?assertEqual(
[<<"event">>, <<"qos">>, <<"psize">>],
[C || C <- [<<"event">>, <<"qos">>, <<"psize">>], lists:member(C, Header)]
),
%% Verify that all the matching messages are present.
?assertMatch(
[
[_TS1, ClientID, T1, P1 | _],
[_TS2, ClientID, T2, P2 | _],
[_TS3, ClientID, T3, P3 | _]
],
Rows
),
%% Verify that timestamp column now has RFC3339 format.
[_Row = [TS1 | _] | _Rest] = Rows,
?assert(
is_integer(emqx_rule_funcs:rfc3339_to_unix_ts(TS1, millisecond)),
TS1
).
t_aggreg_upload_restart(Config) ->
%% NOTE
%% This test verifies that the bridge will reuse existing aggregation buffer