chore(mqtt_bridge): change schema to remote `remote` sub-fields and hide `local`

`local` is still needed for backwards compatibility
This commit is contained in:
Thales Macedo Garitezi 2024-01-15 13:55:11 -03:00
parent 007af20a30
commit 938429f351
6 changed files with 101 additions and 54 deletions

View File

@ -451,11 +451,8 @@ source_config_base() ->
<<"connector">> => ?SOURCE_CONNECTOR_NAME,
<<"parameters">> =>
#{
<<"remote">> =>
#{
<<"topic">> => <<"remote/topic">>,
<<"qos">> => 2
}
<<"topic">> => <<"remote/topic">>,
<<"qos">> => 2
},
<<"resource_opts">> => #{
<<"batch_size">> => 1,

View File

@ -127,8 +127,9 @@ on_add_channel(
true ->
ok
end,
ChannelState0 = maps:get(parameters, ChannelConfig),
ChannelState = emqx_bridge_mqtt_egress:config(ChannelState0),
RemoteParams0 = maps:get(parameters, ChannelConfig),
{LocalParams, RemoteParams} = take(local, RemoteParams0, #{}),
ChannelState = emqx_bridge_mqtt_egress:config(#{remote => RemoteParams, local => LocalParams}),
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState};
@ -144,15 +145,18 @@ on_add_channel(
#{hookpoints := HookPoints} = ChannelConfig
) ->
%% Add ingress channel
ChannelState0 = maps:get(parameters, ChannelConfig),
ChannelState1 = ChannelState0#{
RemoteParams0 = maps:get(parameters, ChannelConfig),
{LocalParams, RemoteParams} = take(local, RemoteParams0, #{}),
ChannelState0 = #{
hookpoints => HookPoints,
server => Server,
config_root => sources
config_root => sources,
local => LocalParams,
remote => RemoteParams
},
ChannelState2 = mk_ingress_config(ChannelId, ChannelState1, TopicToHandlerIndex),
ok = emqx_bridge_mqtt_ingress:subscribe_channel(PoolName, ChannelState2),
NewInstalledChannels = maps:put(ChannelId, ChannelState2, InstalledChannels),
ChannelState1 = mk_ingress_config(ChannelId, ChannelState0, TopicToHandlerIndex),
ok = emqx_bridge_mqtt_ingress:subscribe_channel(PoolName, ChannelState1),
NewInstalledChannels = maps:put(ChannelId, ChannelState1, InstalledChannels),
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
@ -500,3 +504,11 @@ connect(Pid, Name) ->
handle_disconnect(_Reason) ->
ok.
take(Key, Map0, Default) ->
case maps:take(Key, Map0) of
{Value, Map} ->
{Value, Map};
error ->
{Default, Map0}
end.

View File

@ -178,11 +178,13 @@ config(#{ingress_list := IngressList} = Conf, Name, TopicToHandlerIndex) ->
],
Conf#{ingress_list => NewIngressList}.
fix_remote_config(#{remote := RC, local := LC}, BridgeName, TopicToHandlerIndex, Conf) ->
FixedConf = Conf#{
remote => parse_remote(RC, BridgeName),
local => emqx_bridge_mqtt_msg:parse(LC)
fix_remote_config(#{remote := RC}, BridgeName, TopicToHandlerIndex, Conf) ->
FixedConf0 = Conf#{
remote => parse_remote(RC, BridgeName)
},
FixedConf = emqx_utils_maps:update_if_present(
local, fun emqx_bridge_mqtt_msg:parse/1, FixedConf0
),
insert_to_topic_to_handler_index(FixedConf, TopicToHandlerIndex, BridgeName),
FixedConf.

View File

@ -95,8 +95,11 @@ bridge_v1_config_to_action_config_helper(
LocalTopicMap = maps:get(<<"local">>, EgressMap0, #{}),
LocalTopic = maps:get(<<"topic">>, LocalTopicMap, undefined),
EgressMap1 = maps:without([<<"local">>, <<"pool_size">>], EgressMap0),
LocalParams = maps:get(<<"local">>, EgressMap0, #{}),
EgressMap2 = emqx_utils_maps:unindent(<<"remote">>, EgressMap1),
EgressMap = maps:put(<<"local">>, LocalParams, EgressMap2),
%% Add parameters field (Egress map) to the action config
ConfigMap2 = maps:put(<<"parameters">>, EgressMap1, ConfigMap1),
ConfigMap2 = maps:put(<<"parameters">>, EgressMap, ConfigMap1),
ConfigMap3 =
case LocalTopic of
undefined ->
@ -107,7 +110,7 @@ bridge_v1_config_to_action_config_helper(
{action, mqtt, ConfigMap3};
bridge_v1_config_to_action_config_helper(
#{
<<"ingress">> := IngressMap
<<"ingress">> := IngressMap0
} = Config,
ConnectorName
) ->
@ -117,9 +120,12 @@ bridge_v1_config_to_action_config_helper(
ConfigMap1 = general_action_conf_map_from_bridge_v1_config(
Config, ConnectorName, SchemaFields, ResourceOptsSchemaFields
),
IngressMap1 = maps:remove(<<"pool_size">>, IngressMap),
IngressMap1 = maps:without([<<"pool_size">>, <<"local">>], IngressMap0),
LocalParams = maps:get(<<"local">>, IngressMap0, #{}),
IngressMap2 = emqx_utils_maps:unindent(<<"remote">>, IngressMap1),
IngressMap = maps:put(<<"local">>, LocalParams, IngressMap2),
%% Add parameters field (Egress map) to the action config
ConfigMap2 = maps:put(<<"parameters">>, IngressMap1, ConfigMap1),
ConfigMap2 = maps:put(<<"parameters">>, IngressMap, ConfigMap1),
{source, mqtt, ConfigMap2};
bridge_v1_config_to_action_config_helper(
_Config,
@ -182,7 +188,7 @@ check_and_simplify_bridge_v1_config(SimplifiedConfig) ->
connector_action_config_to_bridge_v1_config(
ConnectorConfig, ActionConfig
) ->
Params = maps:get(<<"parameters">>, ActionConfig, #{}),
Params0 = maps:get(<<"parameters">>, ActionConfig, #{}),
ResourceOptsConnector = maps:get(<<"resource_opts">>, ConnectorConfig, #{}),
ResourceOptsAction = maps:get(<<"resource_opts">>, ActionConfig, #{}),
ResourceOpts0 = maps:merge(ResourceOptsConnector, ResourceOptsAction),
@ -194,37 +200,54 @@ connector_action_config_to_bridge_v1_config(
ResourceOpts = maps:with(V1ResourceOptsFields, ResourceOpts0),
%% Check the direction of the action
Direction =
case maps:get(<<"remote">>, Params) of
#{<<"retain">> := _} ->
%% Only source has retain
case is_map_key(<<"retain">>, Params0) of
%% Only source has retain
true ->
<<"publisher">>;
_ ->
false ->
<<"subscriber">>
end,
Parms2 = maps:remove(<<"direction">>, Params),
Params1 = maps:remove(<<"direction">>, Params0),
Params = maps:remove(<<"local">>, Params1),
%% hidden; for backwards compatibility
LocalParams = maps:get(<<"local">>, Params1, #{}),
DefaultPoolSize = emqx_connector_schema_lib:pool_size(default),
PoolSize = maps:get(<<"pool_size">>, ConnectorConfig, DefaultPoolSize),
Parms3 = maps:put(<<"pool_size">>, PoolSize, Parms2),
ConnectorConfig2 = maps:remove(<<"pool_size">>, ConnectorConfig),
LocalTopic = maps:get(<<"local_topic">>, ActionConfig, undefined),
BridgeV1Conf0 =
case {Direction, LocalTopic} of
{<<"publisher">>, undefined} ->
#{<<"egress">> => Parms3};
#{
<<"egress">> =>
#{
<<"pool_size">> => PoolSize,
<<"remote">> => Params,
<<"local">> => LocalParams
}
};
{<<"publisher">>, LocalT} ->
#{
<<"egress">> =>
maps:merge(
Parms3, #{
<<"local">> =>
#{
<<"topic">> => LocalT
}
}
)
#{
<<"pool_size">> => PoolSize,
<<"remote">> => Params,
<<"local">> =>
maps:merge(
LocalParams,
#{<<"topic">> => LocalT}
)
}
};
{<<"subscriber">>, _} ->
#{<<"ingress">> => Parms3}
#{
<<"ingress">> =>
#{
<<"pool_size">> => PoolSize,
<<"remote">> => Params,
<<"local">> => LocalParams
}
}
end,
BridgeV1Conf1 = maps:merge(BridgeV1Conf0, ConnectorConfig2),
BridgeV1Conf2 = BridgeV1Conf1#{

View File

@ -56,10 +56,18 @@ fields("mqtt_publisher_action") ->
)
);
fields(action_parameters) ->
Fields0 = emqx_bridge_mqtt_connector_schema:fields("egress"),
Fields1 = proplists:delete(pool_size, Fields0),
Fields2 = proplists:delete(local, Fields1),
Fields2;
[
%% for backwards compatibility
{local,
mk(
ref(emqx_bridge_mqtt_connector_schema, "egress_local"),
#{
default => #{},
importance => ?IMPORTANCE_HIDDEN
}
)}
| emqx_bridge_mqtt_connector_schema:fields("egress_remote")
];
fields(source) ->
{mqtt,
mk(
@ -71,8 +79,8 @@ fields(source) ->
)};
fields("mqtt_subscriber_source") ->
emqx_bridge_v2_schema:make_consumer_action_schema(
hoconsc:mk(
hoconsc:ref(?MODULE, ingress_parameters),
mk(
ref(?MODULE, ingress_parameters),
#{
required => true,
desc => ?DESC("source_parameters")
@ -80,10 +88,18 @@ fields("mqtt_subscriber_source") ->
)
);
fields(ingress_parameters) ->
Fields0 = emqx_bridge_mqtt_connector_schema:fields("ingress"),
Fields1 = proplists:delete(pool_size, Fields0),
%% FIXME: should we make `local` hidden?
Fields1;
[
%% for backwards compatibility
{local,
mk(
ref(emqx_bridge_mqtt_connector_schema, "ingress_local"),
#{
default => #{},
importance => ?IMPORTANCE_HIDDEN
}
)}
| emqx_bridge_mqtt_connector_schema:fields("ingress_remote")
];
fields(action_resource_opts) ->
UnsupportedOpts = [enable_batch, batch_size, batch_time],
lists:filter(

View File

@ -108,11 +108,8 @@ source_config(Overrides0) ->
<<"connector">> => <<"please override">>,
<<"parameters">> =>
#{
<<"remote">> =>
#{
<<"topic">> => <<"remote/topic">>,
<<"qos">> => 2
}
<<"topic">> => <<"remote/topic">>,
<<"qos">> => 2
},
<<"resource_opts">> => #{
<<"batch_size">> => 1,
@ -197,7 +194,7 @@ t_receive_via_rule(Config) ->
Hookpoint = hookpoint(Config),
RepublishTopic = <<"rep/t">>,
RemoteTopic = emqx_utils_maps:deep_get(
[<<"parameters">>, <<"remote">>, <<"topic">>],
[<<"parameters">>, <<"topic">>],
SourceConfig
),
RuleOpts = #{