diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index d24d4feac..fabaadb92 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -451,11 +451,8 @@ source_config_base() -> <<"connector">> => ?SOURCE_CONNECTOR_NAME, <<"parameters">> => #{ - <<"remote">> => - #{ - <<"topic">> => <<"remote/topic">>, - <<"qos">> => 2 - } + <<"topic">> => <<"remote/topic">>, + <<"qos">> => 2 }, <<"resource_opts">> => #{ <<"batch_size">> => 1, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 18af6ee11..9aae73bd2 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -127,8 +127,9 @@ on_add_channel( true -> ok end, - ChannelState0 = maps:get(parameters, ChannelConfig), - ChannelState = emqx_bridge_mqtt_egress:config(ChannelState0), + RemoteParams0 = maps:get(parameters, ChannelConfig), + {LocalParams, RemoteParams} = take(local, RemoteParams0, #{}), + ChannelState = emqx_bridge_mqtt_egress:config(#{remote => RemoteParams, local => LocalParams}), NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), NewState = OldState#{installed_channels => NewInstalledChannels}, {ok, NewState}; @@ -144,15 +145,18 @@ on_add_channel( #{hookpoints := HookPoints} = ChannelConfig ) -> %% Add ingress channel - ChannelState0 = maps:get(parameters, ChannelConfig), - ChannelState1 = ChannelState0#{ + RemoteParams0 = maps:get(parameters, ChannelConfig), + {LocalParams, RemoteParams} = take(local, RemoteParams0, #{}), + ChannelState0 = #{ hookpoints => HookPoints, server => Server, - config_root => sources + config_root => sources, + local => LocalParams, + remote => RemoteParams }, - ChannelState2 = mk_ingress_config(ChannelId, ChannelState1, TopicToHandlerIndex), - ok = emqx_bridge_mqtt_ingress:subscribe_channel(PoolName, ChannelState2), - NewInstalledChannels = maps:put(ChannelId, ChannelState2, InstalledChannels), + ChannelState1 = mk_ingress_config(ChannelId, ChannelState0, TopicToHandlerIndex), + ok = emqx_bridge_mqtt_ingress:subscribe_channel(PoolName, ChannelState1), + NewInstalledChannels = maps:put(ChannelId, ChannelState1, InstalledChannels), NewState = OldState#{installed_channels => NewInstalledChannels}, {ok, NewState}. @@ -500,3 +504,11 @@ connect(Pid, Name) -> handle_disconnect(_Reason) -> ok. + +take(Key, Map0, Default) -> + case maps:take(Key, Map0) of + {Value, Map} -> + {Value, Map}; + error -> + {Default, Map0} + end. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl index d59318a84..369238ecf 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -178,11 +178,13 @@ config(#{ingress_list := IngressList} = Conf, Name, TopicToHandlerIndex) -> ], Conf#{ingress_list => NewIngressList}. -fix_remote_config(#{remote := RC, local := LC}, BridgeName, TopicToHandlerIndex, Conf) -> - FixedConf = Conf#{ - remote => parse_remote(RC, BridgeName), - local => emqx_bridge_mqtt_msg:parse(LC) +fix_remote_config(#{remote := RC}, BridgeName, TopicToHandlerIndex, Conf) -> + FixedConf0 = Conf#{ + remote => parse_remote(RC, BridgeName) }, + FixedConf = emqx_utils_maps:update_if_present( + local, fun emqx_bridge_mqtt_msg:parse/1, FixedConf0 + ), insert_to_topic_to_handler_index(FixedConf, TopicToHandlerIndex, BridgeName), FixedConf. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_action_info.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_action_info.erl index e4a4fcd19..cf7a5bc04 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_action_info.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_action_info.erl @@ -95,8 +95,11 @@ bridge_v1_config_to_action_config_helper( LocalTopicMap = maps:get(<<"local">>, EgressMap0, #{}), LocalTopic = maps:get(<<"topic">>, LocalTopicMap, undefined), EgressMap1 = maps:without([<<"local">>, <<"pool_size">>], EgressMap0), + LocalParams = maps:get(<<"local">>, EgressMap0, #{}), + EgressMap2 = emqx_utils_maps:unindent(<<"remote">>, EgressMap1), + EgressMap = maps:put(<<"local">>, LocalParams, EgressMap2), %% Add parameters field (Egress map) to the action config - ConfigMap2 = maps:put(<<"parameters">>, EgressMap1, ConfigMap1), + ConfigMap2 = maps:put(<<"parameters">>, EgressMap, ConfigMap1), ConfigMap3 = case LocalTopic of undefined -> @@ -107,7 +110,7 @@ bridge_v1_config_to_action_config_helper( {action, mqtt, ConfigMap3}; bridge_v1_config_to_action_config_helper( #{ - <<"ingress">> := IngressMap + <<"ingress">> := IngressMap0 } = Config, ConnectorName ) -> @@ -117,9 +120,12 @@ bridge_v1_config_to_action_config_helper( ConfigMap1 = general_action_conf_map_from_bridge_v1_config( Config, ConnectorName, SchemaFields, ResourceOptsSchemaFields ), - IngressMap1 = maps:remove(<<"pool_size">>, IngressMap), + IngressMap1 = maps:without([<<"pool_size">>, <<"local">>], IngressMap0), + LocalParams = maps:get(<<"local">>, IngressMap0, #{}), + IngressMap2 = emqx_utils_maps:unindent(<<"remote">>, IngressMap1), + IngressMap = maps:put(<<"local">>, LocalParams, IngressMap2), %% Add parameters field (Egress map) to the action config - ConfigMap2 = maps:put(<<"parameters">>, IngressMap1, ConfigMap1), + ConfigMap2 = maps:put(<<"parameters">>, IngressMap, ConfigMap1), {source, mqtt, ConfigMap2}; bridge_v1_config_to_action_config_helper( _Config, @@ -182,7 +188,7 @@ check_and_simplify_bridge_v1_config(SimplifiedConfig) -> connector_action_config_to_bridge_v1_config( ConnectorConfig, ActionConfig ) -> - Params = maps:get(<<"parameters">>, ActionConfig, #{}), + Params0 = maps:get(<<"parameters">>, ActionConfig, #{}), ResourceOptsConnector = maps:get(<<"resource_opts">>, ConnectorConfig, #{}), ResourceOptsAction = maps:get(<<"resource_opts">>, ActionConfig, #{}), ResourceOpts0 = maps:merge(ResourceOptsConnector, ResourceOptsAction), @@ -194,37 +200,54 @@ connector_action_config_to_bridge_v1_config( ResourceOpts = maps:with(V1ResourceOptsFields, ResourceOpts0), %% Check the direction of the action Direction = - case maps:get(<<"remote">>, Params) of - #{<<"retain">> := _} -> - %% Only source has retain + case is_map_key(<<"retain">>, Params0) of + %% Only source has retain + true -> <<"publisher">>; - _ -> + false -> <<"subscriber">> end, - Parms2 = maps:remove(<<"direction">>, Params), + Params1 = maps:remove(<<"direction">>, Params0), + Params = maps:remove(<<"local">>, Params1), + %% hidden; for backwards compatibility + LocalParams = maps:get(<<"local">>, Params1, #{}), DefaultPoolSize = emqx_connector_schema_lib:pool_size(default), PoolSize = maps:get(<<"pool_size">>, ConnectorConfig, DefaultPoolSize), - Parms3 = maps:put(<<"pool_size">>, PoolSize, Parms2), ConnectorConfig2 = maps:remove(<<"pool_size">>, ConnectorConfig), LocalTopic = maps:get(<<"local_topic">>, ActionConfig, undefined), BridgeV1Conf0 = case {Direction, LocalTopic} of {<<"publisher">>, undefined} -> - #{<<"egress">> => Parms3}; + #{ + <<"egress">> => + #{ + <<"pool_size">> => PoolSize, + <<"remote">> => Params, + <<"local">> => LocalParams + } + }; {<<"publisher">>, LocalT} -> #{ <<"egress">> => - maps:merge( - Parms3, #{ - <<"local">> => - #{ - <<"topic">> => LocalT - } - } - ) + #{ + <<"pool_size">> => PoolSize, + <<"remote">> => Params, + <<"local">> => + maps:merge( + LocalParams, + #{<<"topic">> => LocalT} + ) + } }; {<<"subscriber">>, _} -> - #{<<"ingress">> => Parms3} + #{ + <<"ingress">> => + #{ + <<"pool_size">> => PoolSize, + <<"remote">> => Params, + <<"local">> => LocalParams + } + } end, BridgeV1Conf1 = maps:merge(BridgeV1Conf0, ConnectorConfig2), BridgeV1Conf2 = BridgeV1Conf1#{ diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl index b4c2b63ba..4cf092a60 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl @@ -56,10 +56,18 @@ fields("mqtt_publisher_action") -> ) ); fields(action_parameters) -> - Fields0 = emqx_bridge_mqtt_connector_schema:fields("egress"), - Fields1 = proplists:delete(pool_size, Fields0), - Fields2 = proplists:delete(local, Fields1), - Fields2; + [ + %% for backwards compatibility + {local, + mk( + ref(emqx_bridge_mqtt_connector_schema, "egress_local"), + #{ + default => #{}, + importance => ?IMPORTANCE_HIDDEN + } + )} + | emqx_bridge_mqtt_connector_schema:fields("egress_remote") + ]; fields(source) -> {mqtt, mk( @@ -71,8 +79,8 @@ fields(source) -> )}; fields("mqtt_subscriber_source") -> emqx_bridge_v2_schema:make_consumer_action_schema( - hoconsc:mk( - hoconsc:ref(?MODULE, ingress_parameters), + mk( + ref(?MODULE, ingress_parameters), #{ required => true, desc => ?DESC("source_parameters") @@ -80,10 +88,18 @@ fields("mqtt_subscriber_source") -> ) ); fields(ingress_parameters) -> - Fields0 = emqx_bridge_mqtt_connector_schema:fields("ingress"), - Fields1 = proplists:delete(pool_size, Fields0), - %% FIXME: should we make `local` hidden? - Fields1; + [ + %% for backwards compatibility + {local, + mk( + ref(emqx_bridge_mqtt_connector_schema, "ingress_local"), + #{ + default => #{}, + importance => ?IMPORTANCE_HIDDEN + } + )} + | emqx_bridge_mqtt_connector_schema:fields("ingress_remote") + ]; fields(action_resource_opts) -> UnsupportedOpts = [enable_batch, batch_size, batch_time], lists:filter( diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl index 5569a826b..a0b3edfa7 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl @@ -108,11 +108,8 @@ source_config(Overrides0) -> <<"connector">> => <<"please override">>, <<"parameters">> => #{ - <<"remote">> => - #{ - <<"topic">> => <<"remote/topic">>, - <<"qos">> => 2 - } + <<"topic">> => <<"remote/topic">>, + <<"qos">> => 2 }, <<"resource_opts">> => #{ <<"batch_size">> => 1, @@ -197,7 +194,7 @@ t_receive_via_rule(Config) -> Hookpoint = hookpoint(Config), RepublishTopic = <<"rep/t">>, RemoteTopic = emqx_utils_maps:deep_get( - [<<"parameters">>, <<"remote">>, <<"topic">>], + [<<"parameters">>, <<"topic">>], SourceConfig ), RuleOpts = #{