fix(gcp_pubsub_consumer): fabricate topic mapping when producing v1 config
Fixes https://emqx.atlassian.net/browse/EMQX-11891
This commit is contained in:
parent
7bb9d5d8f6
commit
205b97f732
|
@ -692,7 +692,6 @@ t_consume(Config, Opts) ->
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
ok = add_source_hookpoint(Config),
|
ok = add_source_hookpoint(Config),
|
||||||
ResourceId = resource_id(Config),
|
|
||||||
?retry(
|
?retry(
|
||||||
_Sleep = 200,
|
_Sleep = 200,
|
||||||
_Attempts = 20,
|
_Attempts = 20,
|
||||||
|
@ -732,6 +731,13 @@ t_create_via_http(Config) ->
|
||||||
Config
|
Config
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
|
%% check that v1 list API is fine
|
||||||
|
?assertMatch(
|
||||||
|
{ok, {{_, 200, _}, _, _}},
|
||||||
|
list_bridges_http_api_v1()
|
||||||
|
),
|
||||||
|
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
[]
|
[]
|
||||||
|
|
|
@ -60,8 +60,9 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, SourceConfig) ->
|
||||||
fun(RO) -> maps:with(bridge_v1_resource_opts_fields(), RO) end,
|
fun(RO) -> maps:with(bridge_v1_resource_opts_fields(), RO) end,
|
||||||
BridgeV1Config2
|
BridgeV1Config2
|
||||||
),
|
),
|
||||||
BridgeV1Config4 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"topic">>], BridgeV1Config3),
|
BridgeV1Config4 = maybe_fabricate_topic_mapping(BridgeV1Config3),
|
||||||
emqx_utils_maps:rename(<<"parameters">>, <<"consumer">>, BridgeV1Config4).
|
BridgeV1Config = emqx_utils_maps:deep_remove([<<"parameters">>, <<"topic">>], BridgeV1Config4),
|
||||||
|
emqx_utils_maps:rename(<<"parameters">>, <<"consumer">>, BridgeV1Config).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------------------
|
||||||
%% Internal helper fns
|
%% Internal helper fns
|
||||||
|
@ -74,6 +75,23 @@ maybe_set_pubsub_topic(#{<<"topic_mapping">> := [#{<<"pubsub_topic">> := Topic}
|
||||||
maybe_set_pubsub_topic(Params) ->
|
maybe_set_pubsub_topic(Params) ->
|
||||||
Params.
|
Params.
|
||||||
|
|
||||||
|
%% The old schema requires `topic_mapping', which is now hidden.
|
||||||
|
maybe_fabricate_topic_mapping(#{<<"parameters">> := Params0} = BridgeV1Config0) ->
|
||||||
|
#{<<"topic">> := Topic} = Params0,
|
||||||
|
case maps:get(<<"topic_mapping">>, Params0, undefined) of
|
||||||
|
[_ | _] ->
|
||||||
|
BridgeV1Config0;
|
||||||
|
_ ->
|
||||||
|
%% Have to fabricate an MQTT topic, unfortunately... QoS and payload already
|
||||||
|
%% have defaults.
|
||||||
|
FakeTopicMapping = #{
|
||||||
|
<<"pubsub_topic">> => Topic,
|
||||||
|
<<"mqtt_topic">> => <<>>
|
||||||
|
},
|
||||||
|
Params = Params0#{<<"topic_mapping">> => [FakeTopicMapping]},
|
||||||
|
BridgeV1Config0#{<<"parameters">> := Params}
|
||||||
|
end.
|
||||||
|
|
||||||
resource_opts_fields() ->
|
resource_opts_fields() ->
|
||||||
[
|
[
|
||||||
to_bin(K)
|
to_bin(K)
|
||||||
|
|
|
@ -783,7 +783,7 @@ legacy_maybe_publish_mqtt_message(
|
||||||
},
|
},
|
||||||
SourceResId,
|
SourceResId,
|
||||||
FullMessage
|
FullMessage
|
||||||
) ->
|
) when MQTTTopic =/= <<>> ->
|
||||||
Payload = render(FullMessage, PayloadTemplate),
|
Payload = render(FullMessage, PayloadTemplate),
|
||||||
MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload),
|
MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload),
|
||||||
_ = emqx:publish(MQTTMessage),
|
_ = emqx:publish(MQTTMessage),
|
||||||
|
|
|
@ -35,7 +35,8 @@ common_init_per_testcase(TestCase, Config0) ->
|
||||||
ServiceAccountJSON =
|
ServiceAccountJSON =
|
||||||
#{<<"project_id">> := ProjectId} =
|
#{<<"project_id">> := ProjectId} =
|
||||||
emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
|
emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
|
||||||
Name = atom_to_binary(TestCase),
|
UniqueNum = integer_to_binary(erlang:unique_integer()),
|
||||||
|
Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>,
|
||||||
ConnectorConfig = connector_config(Name, ServiceAccountJSON),
|
ConnectorConfig = connector_config(Name, ServiceAccountJSON),
|
||||||
PubsubTopic = Name,
|
PubsubTopic = Name,
|
||||||
SourceConfig = source_config(#{
|
SourceConfig = source_config(#{
|
||||||
|
@ -117,6 +118,10 @@ t_start_stop(Config) ->
|
||||||
ok = emqx_bridge_v2_testlib:t_start_stop(Config, gcp_pubsub_stop),
|
ok = emqx_bridge_v2_testlib:t_start_stop(Config, gcp_pubsub_stop),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_create_via_http(Config) ->
|
||||||
|
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_consume(Config) ->
|
t_consume(Config) ->
|
||||||
Topic = ?config(pubsub_topic, Config),
|
Topic = ?config(pubsub_topic, Config),
|
||||||
Payload = #{<<"key">> => <<"value">>},
|
Payload = #{<<"key">> => <<"value">>},
|
||||||
|
|
Loading…
Reference in New Issue