From 23f5cea4826677c720b15063be2a02a03f4aa73e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 14 Aug 2023 13:39:38 -0300 Subject: [PATCH] feat: handle strange key values when resolving placeholders --- .../emqx_bridge_gcp_pubsub_impl_producer.erl | 41 ++++++- .../emqx_bridge_gcp_pubsub_producer_SUITE.erl | 115 +++++++++++++++++- 2 files changed, 150 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index f80fdc333..dc5eb01aa 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -219,7 +219,7 @@ encode_payload(State, Selected) -> payload_template := PayloadTemplate } = State, Data = render_payload(PayloadTemplate, Selected), - OrderingKey = render(OrderingKeyTemplate, Selected), + OrderingKey = render_key(OrderingKeyTemplate, Selected), Attributes = proc_attributes(AttributesTemplate, Selected), Payload0 = #{data => base64:encode(Data)}, Payload1 = put_if(Payload0, attributes, Attributes, map_size(Attributes) > 0), @@ -234,9 +234,40 @@ put_if(Acc, _K, _V, false) -> render_payload([] = _Template, Selected) -> emqx_utils_json:encode(Selected); render_payload(Template, Selected) -> - render(Template, Selected). + render_value(Template, Selected). -render(Template, Selected) -> +render_key(Template, Selected) -> + Opts = #{ + return => full_binary, + var_trans => fun + (_Var, undefined) -> + <<>>; + (Var, X) when is_boolean(X) -> + throw({bad_value_for_key, Var, X}); + (_Var, X) when is_binary(X); is_number(X); is_atom(X) -> + emqx_utils_conv:bin(X); + (Var, X) -> + throw({bad_value_for_key, Var, X}) + end + }, + try + emqx_placeholder:proc_tmpl(Template, Selected, Opts) + catch + throw:{bad_value_for_key, Var, X} -> + ?tp( + warning, + "gcp_pubsub_producer_bad_value_for_key", + #{ + placeholder => Var, + value => X, + action => "key ignored", + hint => "only plain values like strings and numbers can be used in keys" + } + ), + <<>> + end. + +render_value(Template, Selected) -> Opts = #{ return => full_binary, var_trans => fun @@ -264,12 +295,12 @@ preproc_attributes(AttributesTemplate) -> proc_attributes(AttributesTemplate, Selected) -> maps:fold( fun(KT, VT, Acc) -> - K = render(KT, Selected), + K = render_key(KT, Selected), case K =:= <<>> of true -> Acc; false -> - V = render(VT, Selected), + V = render_value(VT, Selected), Acc#{K => V} end end, diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index 74affb9c8..acfe3df8b 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -64,7 +64,8 @@ single_config_tests() -> t_get_status_no_worker, t_get_status_timeout_calling_workers, t_on_start_ehttpc_pool_already_started, - t_attributes + t_attributes, + t_bad_attributes ]. only_sync_tests() -> @@ -484,9 +485,15 @@ assert_http_request(ServiceAccountJSON) -> error({timeout, #{mailbox => Mailbox}}) end. +receive_http_requests(ServiceAccountJSON, Opts) -> + Default = #{n => 1}, + #{n := N} = maps:merge(Default, Opts), + lists:flatmap(fun(_) -> receive_http_request(ServiceAccountJSON) end, lists:seq(1, N)). + receive_http_request(ServiceAccountJSON) -> receive {http, Headers, Body} -> + ct:pal("received publish:\n ~p", [#{headers => Headers, body => Body}]), assert_valid_request_headers(Headers, ServiceAccountJSON), #{<<"messages">> := Msgs} = emqx_utils_json:decode(Body, [return_maps]), lists:map( @@ -1689,3 +1696,109 @@ t_attributes(Config) -> [] ), ok. + +t_bad_attributes(Config) -> + ServiceAccountJSON = ?config(service_account_json, Config), + LocalTopic = <<"t/topic">>, + ?check_trace( + begin + {ok, _} = create_bridge_http( + Config, + #{ + <<"local_topic">> => LocalTopic, + <<"attributes_template">> => + [ + #{ + <<"key">> => <<"${.payload.key}">>, + <<"value">> => <<"${.payload.value}">> + } + ], + <<"ordering_key_template">> => <<"${.payload.ok}">> + } + ), + %% Ok: attribute value is a map or list + lists:foreach( + fun(OkValue) -> + Payload0 = + emqx_utils_json:encode( + #{ + <<"ok">> => <<"ord_key">>, + <<"value">> => OkValue, + <<"key">> => <<"attr_key">> + } + ), + Message0 = emqx_message:make(LocalTopic, Payload0), + emqx:publish(Message0) + end, + [ + #{<<"some">> => <<"map">>}, + [1, <<"str">>, #{<<"deep">> => true}] + ] + ), + DecodedMessages0 = receive_http_requests(ServiceAccountJSON, #{n => 1}), + ?assertMatch( + [ + #{ + <<"attributes">> := + #{<<"attr_key">> := <<"{\"some\":\"map\"}">>}, + <<"orderingKey">> := <<"ord_key">> + }, + #{ + <<"attributes">> := + #{<<"attr_key">> := <<"[1,\"str\",{\"deep\":true}]">>}, + <<"orderingKey">> := <<"ord_key">> + } + ], + DecodedMessages0 + ), + %% Bad: key is not a plain value + lists:foreach( + fun(BadKey) -> + Payload1 = + emqx_utils_json:encode( + #{ + <<"value">> => <<"v">>, + <<"key">> => BadKey, + <<"ok">> => BadKey + } + ), + Message1 = emqx_message:make(LocalTopic, Payload1), + emqx:publish(Message1) + end, + [ + #{<<"some">> => <<"map">>}, + [1, <<"list">>, true], + true, + false + ] + ), + DecodedMessages1 = receive_http_request(ServiceAccountJSON), + lists:foreach( + fun(DMsg) -> + ?assertNot(is_map_key(<<"orderingKey">>, DMsg), #{decoded_message => DMsg}), + ?assertNot(is_map_key(<<"attributes">>, DMsg), #{decoded_message => DMsg}), + ok + end, + DecodedMessages1 + ), + ok + end, + fun(Trace) -> + ct:pal("trace:\n ~p", [Trace]), + ?assertMatch( + [ + #{placeholder := [<<"payload">>, <<"ok">>], value := #{}}, + #{placeholder := [<<"payload">>, <<"key">>], value := #{}}, + #{placeholder := [<<"payload">>, <<"ok">>], value := [_ | _]}, + #{placeholder := [<<"payload">>, <<"key">>], value := [_ | _]}, + #{placeholder := [<<"payload">>, <<"ok">>], value := true}, + #{placeholder := [<<"payload">>, <<"key">>], value := true}, + #{placeholder := [<<"payload">>, <<"ok">>], value := false}, + #{placeholder := [<<"payload">>, <<"key">>], value := false} + ], + ?of_kind("gcp_pubsub_producer_bad_value_for_key", Trace) + ), + ok + end + ), + ok.