diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl index a6e25a87c..96d924912 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_csv.erl @@ -57,7 +57,7 @@ close(#csv{}) -> %% mk_columns(Record, #csv{order = ColumnOrder}) -> - Columns = lists:sort(maps:keys(Record)), + Columns = [emqx_utils_conv:bin(C) || C <- lists:sort(maps:keys(Record))], Unoredered = Columns -- ColumnOrder, ColumnOrder ++ Unoredered. @@ -81,11 +81,11 @@ emit_row(#{}, [], #csv{delimiter = Delim}) -> [Delim]. emit_cell(Column, Record, CSV) -> - case maps:get(Column, Record, undefined) of - undefined -> - _Empty = ""; - Value -> - encode_cell(emqx_template:to_string(Value), CSV) + case emqx_template:lookup(Column, Record) of + {ok, Value} -> + encode_cell(emqx_template:to_string(Value), CSV); + {error, undefined} -> + _Empty = "" end. encode_cell(V, #csv{quoting_mp = MP}) -> diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl index 85fdd844b..6577b45ed 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -193,6 +193,56 @@ t_aggreg_upload(Config) -> erl_csv:decode(Content) ). +t_aggreg_upload_rule(Config) -> + Bucket = ?config(s3_bucket, Config), + BridgeName = ?config(bridge_name, Config), + ClientID = emqx_utils_conv:bin(?FUNCTION_NAME), + %% Create a bridge with the sample configuration and a simple SQL rule. + ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), + ?assertMatch( + {ok, _Rule}, + emqx_bridge_v2_testlib:create_rule_and_action_http(?BRIDGE_TYPE, <<>>, Config, #{ + sql => << + "SELECT" + " *," + " strlen(payload) as psize," + " unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at" + " FROM 's3/#'" + >> + }) + ), + ok = lists:foreach(fun emqx:publish/1, [ + emqx_message:make(?FUNCTION_NAME, T1 = <<"s3/m1">>, P1 = <<"[HELLO]">>), + emqx_message:make(?FUNCTION_NAME, T2 = <<"s3/m2">>, P2 = <<"[WORLD]">>), + emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>), + emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>) + ]), + ?block_until(#{?snk_kind := s3_aggreg_delivery_completed, action := BridgeName}), + %% Check the uploaded objects. + _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), + _CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key), + %% Verify that column order is respected and event fields are preserved. + ?assertMatch(?CONF_COLUMN_ORDER(_), Header), + ?assertEqual( + [<<"event">>, <<"qos">>, <<"psize">>], + [C || C <- [<<"event">>, <<"qos">>, <<"psize">>], lists:member(C, Header)] + ), + %% Verify that all the matching messages are present. + ?assertMatch( + [ + [_TS1, ClientID, T1, P1 | _], + [_TS2, ClientID, T2, P2 | _], + [_TS3, ClientID, T3, P3 | _] + ], + Rows + ), + %% Verify that timestamp column now has RFC3339 format. + [_Row = [TS1 | _] | _Rest] = Rows, + ?assert( + is_integer(emqx_rule_funcs:rfc3339_to_unix_ts(TS1, millisecond)), + TS1 + ). + t_aggreg_upload_restart(Config) -> %% NOTE %% This test verifies that the bridge will reuse existing aggregation buffer