refactor: unify top level field names for bridge v2
This commit is contained in:
parent
6af6309301
commit
9a26c03a5c
|
@ -900,7 +900,7 @@ format_resource(
|
||||||
case emqx_bridge_v2:is_bridge_v2_type(Type) of
|
case emqx_bridge_v2:is_bridge_v2_type(Type) of
|
||||||
true ->
|
true ->
|
||||||
%% The defaults are already filled in
|
%% The defaults are already filled in
|
||||||
RawConf;
|
downgrade_raw_conf(Type, RawConf);
|
||||||
false ->
|
false ->
|
||||||
fill_defaults(Type, RawConf)
|
fill_defaults(Type, RawConf)
|
||||||
end,
|
end,
|
||||||
|
@ -1164,3 +1164,19 @@ upgrade_type(Type) ->
|
||||||
|
|
||||||
downgrade_type(Type) ->
|
downgrade_type(Type) ->
|
||||||
emqx_bridge_lib:downgrade_type(Type).
|
emqx_bridge_lib:downgrade_type(Type).
|
||||||
|
|
||||||
|
%% TODO: move it to callback
|
||||||
|
downgrade_raw_conf(kafka_producer, RawConf) ->
|
||||||
|
rename(<<"parameters">>, <<"kafka">>, RawConf);
|
||||||
|
downgrade_raw_conf(azure_event_hub_producer, RawConf) ->
|
||||||
|
rename(<<"parameters">>, <<"kafka">>, RawConf);
|
||||||
|
downgrade_raw_conf(_Type, RawConf) ->
|
||||||
|
RawConf.
|
||||||
|
|
||||||
|
rename(OldKey, NewKey, Map) ->
|
||||||
|
case maps:find(OldKey, Map) of
|
||||||
|
{ok, Value} ->
|
||||||
|
maps:remove(OldKey, maps:put(NewKey, Value, Map));
|
||||||
|
error ->
|
||||||
|
Map
|
||||||
|
end.
|
||||||
|
|
|
@ -164,9 +164,8 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
common_field_names() ->
|
common_field_names() ->
|
||||||
%% TODO: add 'config' to the list
|
|
||||||
[
|
[
|
||||||
enable, description, local_topic, connector, resource_opts
|
enable, description, local_topic, connector, resource_opts, parameters
|
||||||
].
|
].
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -82,7 +82,7 @@ fields("config_bridge_v2") ->
|
||||||
fields(bridge_v2);
|
fields(bridge_v2);
|
||||||
fields("config_connector") ->
|
fields("config_connector") ->
|
||||||
Fields = override(
|
Fields = override(
|
||||||
emqx_bridge_kafka:fields(kafka_connector),
|
emqx_bridge_kafka:fields("config_connector"),
|
||||||
connector_overrides()
|
connector_overrides()
|
||||||
),
|
),
|
||||||
override_documentations(Fields);
|
override_documentations(Fields);
|
||||||
|
@ -117,7 +117,7 @@ fields(kafka_message) ->
|
||||||
fields(bridge_v2) ->
|
fields(bridge_v2) ->
|
||||||
Fields =
|
Fields =
|
||||||
override(
|
override(
|
||||||
emqx_bridge_kafka:fields(producer_opts),
|
emqx_bridge_kafka:producer_opts(),
|
||||||
bridge_v2_overrides()
|
bridge_v2_overrides()
|
||||||
) ++
|
) ++
|
||||||
[
|
[
|
||||||
|
@ -267,7 +267,7 @@ values(common_config) ->
|
||||||
};
|
};
|
||||||
values(producer) ->
|
values(producer) ->
|
||||||
#{
|
#{
|
||||||
kafka => #{
|
parameters => #{
|
||||||
topic => <<"topic">>,
|
topic => <<"topic">>,
|
||||||
message => #{
|
message => #{
|
||||||
key => <<"${.clientid}">>,
|
key => <<"${.clientid}">>,
|
||||||
|
@ -378,18 +378,27 @@ producer_overrides() ->
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
%% NOTE: field 'kafka' is renamed to 'parameters' since e5.3.1
|
||||||
|
%% We will keep 'kafka' for backward compatibility.
|
||||||
|
%% TODO: delete this override when we upgrade bridge schema json to 0.2.0
|
||||||
|
%% See emqx_conf:bridge_schema_json/0
|
||||||
kafka =>
|
kafka =>
|
||||||
mk(ref(producer_kafka_opts), #{
|
mk(ref(producer_kafka_opts), #{
|
||||||
required => true,
|
required => true,
|
||||||
validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
|
validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
|
||||||
}),
|
}),
|
||||||
|
parameters =>
|
||||||
|
mk(ref(producer_kafka_opts), #{
|
||||||
|
required => true,
|
||||||
|
validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
|
||||||
|
}),
|
||||||
ssl => mk(ref(ssl_client_opts), #{default => #{<<"enable">> => true}}),
|
ssl => mk(ref(ssl_client_opts), #{default => #{<<"enable">> => true}}),
|
||||||
type => mk(azure_event_hub_producer, #{required => true})
|
type => mk(azure_event_hub_producer, #{required => true})
|
||||||
}.
|
}.
|
||||||
|
|
||||||
bridge_v2_overrides() ->
|
bridge_v2_overrides() ->
|
||||||
#{
|
#{
|
||||||
kafka =>
|
parameters =>
|
||||||
mk(ref(producer_kafka_opts), #{
|
mk(ref(producer_kafka_opts), #{
|
||||||
required => true,
|
required => true,
|
||||||
validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
|
validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
|
||||||
|
|
|
@ -28,10 +28,14 @@
|
||||||
fields/1,
|
fields/1,
|
||||||
desc/1,
|
desc/1,
|
||||||
host_opts/0,
|
host_opts/0,
|
||||||
ssl_client_opts_fields/0
|
ssl_client_opts_fields/0,
|
||||||
|
producer_opts/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([kafka_producer_converter/2, producer_strategy_key_validator/1]).
|
-export([
|
||||||
|
kafka_producer_converter/2,
|
||||||
|
producer_strategy_key_validator/1
|
||||||
|
]).
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% api
|
%% api
|
||||||
|
@ -251,15 +255,13 @@ fields("get_" ++ Type) ->
|
||||||
fields("config_bridge_v2") ->
|
fields("config_bridge_v2") ->
|
||||||
fields(kafka_producer_action);
|
fields(kafka_producer_action);
|
||||||
fields("config_connector") ->
|
fields("config_connector") ->
|
||||||
fields(kafka_connector);
|
connector_config_fields();
|
||||||
fields("config_producer") ->
|
fields("config_producer") ->
|
||||||
fields(kafka_producer);
|
fields(kafka_producer);
|
||||||
fields("config_consumer") ->
|
fields("config_consumer") ->
|
||||||
fields(kafka_consumer);
|
fields(kafka_consumer);
|
||||||
fields(kafka_connector) ->
|
|
||||||
fields("config");
|
|
||||||
fields(kafka_producer) ->
|
fields(kafka_producer) ->
|
||||||
fields("config") ++ fields(producer_opts);
|
connector_config_fields() ++ producer_opts();
|
||||||
fields(kafka_producer_action) ->
|
fields(kafka_producer_action) ->
|
||||||
[
|
[
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||||
|
@ -268,49 +270,9 @@ fields(kafka_producer_action) ->
|
||||||
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
|
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
|
||||||
})},
|
})},
|
||||||
{description, emqx_schema:description_schema()}
|
{description, emqx_schema:description_schema()}
|
||||||
] ++ fields(producer_opts);
|
] ++ producer_opts();
|
||||||
fields(kafka_consumer) ->
|
fields(kafka_consumer) ->
|
||||||
fields("config") ++ fields(consumer_opts);
|
connector_config_fields() ++ fields(consumer_opts);
|
||||||
fields("config") ->
|
|
||||||
[
|
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
|
||||||
{description, emqx_schema:description_schema()},
|
|
||||||
{bootstrap_hosts,
|
|
||||||
mk(
|
|
||||||
binary(),
|
|
||||||
#{
|
|
||||||
required => true,
|
|
||||||
desc => ?DESC(bootstrap_hosts),
|
|
||||||
validator => emqx_schema:servers_validator(
|
|
||||||
host_opts(), _Required = true
|
|
||||||
)
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{connect_timeout,
|
|
||||||
mk(emqx_schema:timeout_duration_ms(), #{
|
|
||||||
default => <<"5s">>,
|
|
||||||
desc => ?DESC(connect_timeout)
|
|
||||||
})},
|
|
||||||
{min_metadata_refresh_interval,
|
|
||||||
mk(
|
|
||||||
emqx_schema:timeout_duration_ms(),
|
|
||||||
#{
|
|
||||||
default => <<"3s">>,
|
|
||||||
desc => ?DESC(min_metadata_refresh_interval)
|
|
||||||
}
|
|
||||||
)},
|
|
||||||
{metadata_request_timeout,
|
|
||||||
mk(emqx_schema:timeout_duration_ms(), #{
|
|
||||||
default => <<"5s">>,
|
|
||||||
desc => ?DESC(metadata_request_timeout)
|
|
||||||
})},
|
|
||||||
{authentication,
|
|
||||||
mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{
|
|
||||||
default => none, desc => ?DESC("authentication")
|
|
||||||
})},
|
|
||||||
{socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})},
|
|
||||||
{ssl, mk(ref(ssl_client_opts), #{})}
|
|
||||||
];
|
|
||||||
fields(ssl_client_opts) ->
|
fields(ssl_client_opts) ->
|
||||||
ssl_client_opts_fields();
|
ssl_client_opts_fields();
|
||||||
fields(auth_username_password) ->
|
fields(auth_username_password) ->
|
||||||
|
@ -369,20 +331,6 @@ fields(socket_opts) ->
|
||||||
validator => fun emqx_schema:validate_tcp_keepalive/1
|
validator => fun emqx_schema:validate_tcp_keepalive/1
|
||||||
})}
|
})}
|
||||||
];
|
];
|
||||||
fields(producer_opts) ->
|
|
||||||
[
|
|
||||||
%% Note: there's an implicit convention in `emqx_bridge' that,
|
|
||||||
%% for egress bridges with this config, the published messages
|
|
||||||
%% will be forwarded to such bridges.
|
|
||||||
{local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})},
|
|
||||||
{kafka,
|
|
||||||
mk(ref(producer_kafka_opts), #{
|
|
||||||
required => true,
|
|
||||||
desc => ?DESC(producer_kafka_opts),
|
|
||||||
validator => fun producer_strategy_key_validator/1
|
|
||||||
})},
|
|
||||||
{resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
|
|
||||||
];
|
|
||||||
fields(producer_kafka_opts) ->
|
fields(producer_kafka_opts) ->
|
||||||
[
|
[
|
||||||
{topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
|
{topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
|
||||||
|
@ -580,7 +528,7 @@ fields(resource_opts) ->
|
||||||
CreationOpts = emqx_resource_schema:create_opts(_Overrides = []),
|
CreationOpts = emqx_resource_schema:create_opts(_Overrides = []),
|
||||||
lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts).
|
lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts).
|
||||||
|
|
||||||
desc("config") ->
|
desc("config_connector") ->
|
||||||
?DESC("desc_config");
|
?DESC("desc_config");
|
||||||
desc(resource_opts) ->
|
desc(resource_opts) ->
|
||||||
?DESC(emqx_resource_schema, "resource_opts");
|
?DESC(emqx_resource_schema, "resource_opts");
|
||||||
|
@ -599,34 +547,86 @@ desc("post_" ++ Type) when
|
||||||
desc(kafka_producer_action) ->
|
desc(kafka_producer_action) ->
|
||||||
?DESC("kafka_producer_action");
|
?DESC("kafka_producer_action");
|
||||||
desc(Name) ->
|
desc(Name) ->
|
||||||
lists:member(Name, struct_names()) orelse throw({missing_desc, Name}),
|
|
||||||
?DESC(Name).
|
?DESC(Name).
|
||||||
|
|
||||||
struct_names() ->
|
connector_config_fields() ->
|
||||||
[
|
[
|
||||||
auth_gssapi_kerberos,
|
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||||
auth_username_password,
|
{description, emqx_schema:description_schema()},
|
||||||
kafka_message,
|
{bootstrap_hosts,
|
||||||
kafka_producer,
|
mk(
|
||||||
kafka_consumer,
|
binary(),
|
||||||
producer_buffer,
|
#{
|
||||||
producer_kafka_opts,
|
required => true,
|
||||||
socket_opts,
|
desc => ?DESC(bootstrap_hosts),
|
||||||
producer_opts,
|
validator => emqx_schema:servers_validator(
|
||||||
consumer_opts,
|
host_opts(), _Required = true
|
||||||
consumer_kafka_opts,
|
)
|
||||||
consumer_topic_mapping,
|
}
|
||||||
producer_kafka_ext_headers,
|
)},
|
||||||
ssl_client_opts
|
{connect_timeout,
|
||||||
|
mk(emqx_schema:timeout_duration_ms(), #{
|
||||||
|
default => <<"5s">>,
|
||||||
|
desc => ?DESC(connect_timeout)
|
||||||
|
})},
|
||||||
|
{min_metadata_refresh_interval,
|
||||||
|
mk(
|
||||||
|
emqx_schema:timeout_duration_ms(),
|
||||||
|
#{
|
||||||
|
default => <<"3s">>,
|
||||||
|
desc => ?DESC(min_metadata_refresh_interval)
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
{metadata_request_timeout,
|
||||||
|
mk(emqx_schema:timeout_duration_ms(), #{
|
||||||
|
default => <<"5s">>,
|
||||||
|
desc => ?DESC(metadata_request_timeout)
|
||||||
|
})},
|
||||||
|
{authentication,
|
||||||
|
mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{
|
||||||
|
default => none, desc => ?DESC("authentication")
|
||||||
|
})},
|
||||||
|
{socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})},
|
||||||
|
{ssl, mk(ref(ssl_client_opts), #{})}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
producer_opts() ->
|
||||||
|
[
|
||||||
|
%% Note: there's an implicit convention in `emqx_bridge' that,
|
||||||
|
%% for egress bridges with this config, the published messages
|
||||||
|
%% will be forwarded to such bridges.
|
||||||
|
{local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})},
|
||||||
|
parameters_field(),
|
||||||
|
{resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
|
||||||
|
].
|
||||||
|
|
||||||
|
%% Since e5.3.1, we want to rename the field 'kafka' to 'parameters'
|
||||||
|
%% Hoever we need to keep it backward compatible for generated schema json (version 0.1.0)
|
||||||
|
%% since schema is data for the 'schemas' API.
|
||||||
|
parameters_field() ->
|
||||||
|
{Name, Alias} =
|
||||||
|
case get(emqx_bridge_schema_version) of
|
||||||
|
<<"0.1.0">> ->
|
||||||
|
{kafka, parameters};
|
||||||
|
_ ->
|
||||||
|
{parameters, kafka}
|
||||||
|
end,
|
||||||
|
{Name,
|
||||||
|
mk(ref(producer_kafka_opts), #{
|
||||||
|
required => true,
|
||||||
|
aliases => [Alias],
|
||||||
|
desc => ?DESC(producer_kafka_opts),
|
||||||
|
validator => fun producer_strategy_key_validator/1
|
||||||
|
})}.
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% internal
|
%% internal
|
||||||
type_field(BridgeV2Type) when BridgeV2Type =:= "connector"; BridgeV2Type =:= "bridge_v2" ->
|
type_field(BridgeV2Type) when BridgeV2Type =:= "connector"; BridgeV2Type =:= "bridge_v2" ->
|
||||||
{type, mk(enum([kafka_producer]), #{required => true, desc => ?DESC("desc_type")})};
|
{type, mk(enum([kafka_producer]), #{required => true, desc => ?DESC("desc_type")})};
|
||||||
type_field(_) ->
|
type_field(_) ->
|
||||||
{type,
|
{type,
|
||||||
mk(enum([kafka_consumer, kafka, kafka_producer]), #{
|
%% 'kafka' is kept for backward compatibility
|
||||||
|
mk(enum([kafka, kafka_producer, kafka_consumer]), #{
|
||||||
required => true, desc => ?DESC("desc_type")
|
required => true, desc => ?DESC("desc_type")
|
||||||
})}.
|
})}.
|
||||||
|
|
||||||
|
@ -641,17 +641,23 @@ kafka_producer_converter(undefined, _HoconOpts) ->
|
||||||
kafka_producer_converter(
|
kafka_producer_converter(
|
||||||
#{<<"producer">> := OldOpts0, <<"bootstrap_hosts">> := _} = Config0, _HoconOpts
|
#{<<"producer">> := OldOpts0, <<"bootstrap_hosts">> := _} = Config0, _HoconOpts
|
||||||
) ->
|
) ->
|
||||||
%% old schema
|
%% prior to e5.0.2
|
||||||
MQTTOpts = maps:get(<<"mqtt">>, OldOpts0, #{}),
|
MQTTOpts = maps:get(<<"mqtt">>, OldOpts0, #{}),
|
||||||
LocalTopic = maps:get(<<"topic">>, MQTTOpts, undefined),
|
LocalTopic = maps:get(<<"topic">>, MQTTOpts, undefined),
|
||||||
KafkaOpts = maps:get(<<"kafka">>, OldOpts0),
|
KafkaOpts = maps:get(<<"kafka">>, OldOpts0),
|
||||||
Config = maps:without([<<"producer">>], Config0),
|
Config = maps:without([<<"producer">>], Config0),
|
||||||
case LocalTopic =:= undefined of
|
case LocalTopic =:= undefined of
|
||||||
true ->
|
true ->
|
||||||
Config#{<<"kafka">> => KafkaOpts};
|
Config#{<<"parameters">> => KafkaOpts};
|
||||||
false ->
|
false ->
|
||||||
Config#{<<"kafka">> => KafkaOpts, <<"local_topic">> => LocalTopic}
|
Config#{<<"parameters">> => KafkaOpts, <<"local_topic">> => LocalTopic}
|
||||||
end;
|
end;
|
||||||
|
kafka_producer_converter(
|
||||||
|
#{<<"kafka">> := _} = Config0, _HoconOpts
|
||||||
|
) ->
|
||||||
|
%% from e5.0.2 to e5.3.0
|
||||||
|
{KafkaOpts, Config} = maps:take(<<"kafka">>, Config0),
|
||||||
|
Config#{<<"parameters">> => KafkaOpts};
|
||||||
kafka_producer_converter(Config, _HoconOpts) ->
|
kafka_producer_converter(Config, _HoconOpts) ->
|
||||||
%% new schema
|
%% new schema
|
||||||
Config.
|
Config.
|
||||||
|
|
|
@ -35,7 +35,7 @@
|
||||||
-define(kafka_client_id, kafka_client_id).
|
-define(kafka_client_id, kafka_client_id).
|
||||||
-define(kafka_producers, kafka_producers).
|
-define(kafka_producers, kafka_producers).
|
||||||
|
|
||||||
query_mode(#{kafka := #{query_mode := sync}}) ->
|
query_mode(#{parameters := #{query_mode := sync}}) ->
|
||||||
simple_sync_internal_buffer;
|
simple_sync_internal_buffer;
|
||||||
query_mode(_) ->
|
query_mode(_) ->
|
||||||
simple_async_internal_buffer.
|
simple_async_internal_buffer.
|
||||||
|
@ -111,7 +111,7 @@ create_producers_for_bridge_v2(
|
||||||
ClientId,
|
ClientId,
|
||||||
#{
|
#{
|
||||||
bridge_type := BridgeType,
|
bridge_type := BridgeType,
|
||||||
kafka := KafkaConfig
|
parameters := KafkaConfig
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
#{
|
#{
|
||||||
|
|
|
@ -6,6 +6,10 @@
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-export([atoms/0]).
|
||||||
|
%% ensure atoms exist
|
||||||
|
atoms() -> [myproducer, my_consumer].
|
||||||
|
|
||||||
%%===========================================================================
|
%%===========================================================================
|
||||||
%% Test cases
|
%% Test cases
|
||||||
%%===========================================================================
|
%%===========================================================================
|
||||||
|
@ -14,7 +18,6 @@ kafka_producer_test() ->
|
||||||
Conf1 = parse(kafka_producer_old_hocon(_WithLocalTopic0 = false)),
|
Conf1 = parse(kafka_producer_old_hocon(_WithLocalTopic0 = false)),
|
||||||
Conf2 = parse(kafka_producer_old_hocon(_WithLocalTopic1 = true)),
|
Conf2 = parse(kafka_producer_old_hocon(_WithLocalTopic1 = true)),
|
||||||
Conf3 = parse(kafka_producer_new_hocon()),
|
Conf3 = parse(kafka_producer_new_hocon()),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
<<"bridges">> :=
|
<<"bridges">> :=
|
||||||
|
@ -22,7 +25,7 @@ kafka_producer_test() ->
|
||||||
<<"kafka_producer">> :=
|
<<"kafka_producer">> :=
|
||||||
#{
|
#{
|
||||||
<<"myproducer">> :=
|
<<"myproducer">> :=
|
||||||
#{<<"kafka">> := #{}}
|
#{<<"parameters">> := #{}}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -49,7 +52,7 @@ kafka_producer_test() ->
|
||||||
#{
|
#{
|
||||||
<<"myproducer">> :=
|
<<"myproducer">> :=
|
||||||
#{
|
#{
|
||||||
<<"kafka">> := #{},
|
<<"parameters">> := #{},
|
||||||
<<"local_topic">> := <<"mqtt/local">>
|
<<"local_topic">> := <<"mqtt/local">>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -65,7 +68,7 @@ kafka_producer_test() ->
|
||||||
#{
|
#{
|
||||||
<<"myproducer">> :=
|
<<"myproducer">> :=
|
||||||
#{
|
#{
|
||||||
<<"kafka">> := #{},
|
<<"parameters">> := #{},
|
||||||
<<"local_topic">> := <<"mqtt/local">>
|
<<"local_topic">> := <<"mqtt/local">>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -156,12 +159,14 @@ message_key_dispatch_validations_test() ->
|
||||||
<<"message">> := #{<<"key">> := <<>>}
|
<<"message">> := #{<<"key">> := <<>>}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
emqx_utils_maps:deep_get([<<"bridges">>, <<"kafka">>, atom_to_binary(Name)], Conf)
|
emqx_utils_maps:deep_get(
|
||||||
|
[<<"bridges">>, <<"kafka">>, atom_to_binary(Name)], Conf
|
||||||
|
)
|
||||||
),
|
),
|
||||||
?assertThrow(
|
?assertThrow(
|
||||||
{_, [
|
{_, [
|
||||||
#{
|
#{
|
||||||
path := "bridges.kafka_producer.myproducer.kafka",
|
path := "bridges.kafka_producer.myproducer.parameters",
|
||||||
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
|
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
|
||||||
}
|
}
|
||||||
]},
|
]},
|
||||||
|
@ -170,7 +175,7 @@ message_key_dispatch_validations_test() ->
|
||||||
?assertThrow(
|
?assertThrow(
|
||||||
{_, [
|
{_, [
|
||||||
#{
|
#{
|
||||||
path := "bridges.kafka_producer.myproducer.kafka",
|
path := "bridges.kafka_producer.myproducer.parameters",
|
||||||
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
|
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
|
||||||
}
|
}
|
||||||
]},
|
]},
|
||||||
|
@ -181,8 +186,6 @@ message_key_dispatch_validations_test() ->
|
||||||
tcp_keepalive_validation_test_() ->
|
tcp_keepalive_validation_test_() ->
|
||||||
ProducerConf = parse(kafka_producer_new_hocon()),
|
ProducerConf = parse(kafka_producer_new_hocon()),
|
||||||
ConsumerConf = parse(kafka_consumer_hocon()),
|
ConsumerConf = parse(kafka_consumer_hocon()),
|
||||||
%% ensure atoms exist
|
|
||||||
_ = [my_producer, my_consumer],
|
|
||||||
test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++
|
test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++
|
||||||
test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf).
|
test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf).
|
||||||
|
|
||||||
|
|
|
@ -188,8 +188,14 @@ hotconf_schema_json() ->
|
||||||
|
|
||||||
%% TODO: move this function to emqx_dashboard when we stop generating this JSON at build time.
|
%% TODO: move this function to emqx_dashboard when we stop generating this JSON at build time.
|
||||||
bridge_schema_json() ->
|
bridge_schema_json() ->
|
||||||
SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => <<"0.1.0">>},
|
Version = <<"0.1.0">>,
|
||||||
gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo).
|
SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => Version},
|
||||||
|
put(emqx_bridge_schema_version, Version),
|
||||||
|
try
|
||||||
|
gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo)
|
||||||
|
after
|
||||||
|
erase(emqx_bridge_schema_version)
|
||||||
|
end.
|
||||||
|
|
||||||
%% TODO: remove it and also remove hocon_md.erl and friends.
|
%% TODO: remove it and also remove hocon_md.erl and friends.
|
||||||
%% markdown generation from schema is a failure and we are moving to an interactive
|
%% markdown generation from schema is a failure and we are moving to an interactive
|
||||||
|
|
|
@ -453,9 +453,30 @@ update_connector(ConnectorType, ConnectorName, Conf) ->
|
||||||
create_or_update_connector(ConnectorType, ConnectorName, Conf, 200).
|
create_or_update_connector(ConnectorType, ConnectorName, Conf, 200).
|
||||||
|
|
||||||
create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) ->
|
create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) ->
|
||||||
|
Check =
|
||||||
|
try
|
||||||
|
is_binary(ConnectorType) andalso emqx_resource:validate_type(ConnectorType),
|
||||||
|
ok = emqx_resource:validate_name(ConnectorName)
|
||||||
|
catch
|
||||||
|
throw:Error ->
|
||||||
|
?BAD_REQUEST(map_to_json(Error))
|
||||||
|
end,
|
||||||
|
case Check of
|
||||||
|
ok ->
|
||||||
|
do_create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode);
|
||||||
|
BadRequest ->
|
||||||
|
BadRequest
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) ->
|
||||||
case emqx_connector:create(ConnectorType, ConnectorName, Conf) of
|
case emqx_connector:create(ConnectorType, ConnectorName, Conf) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
lookup_from_all_nodes(ConnectorType, ConnectorName, HttpStatusCode);
|
lookup_from_all_nodes(ConnectorType, ConnectorName, HttpStatusCode);
|
||||||
|
{error, {PreOrPostConfigUpdate, _HandlerMod, Reason}} when
|
||||||
|
PreOrPostConfigUpdate =:= pre_config_update;
|
||||||
|
PreOrPostConfigUpdate =:= post_config_update
|
||||||
|
->
|
||||||
|
?BAD_REQUEST(map_to_json(redact(Reason)));
|
||||||
{error, Reason} when is_map(Reason) ->
|
{error, Reason} when is_map(Reason) ->
|
||||||
?BAD_REQUEST(map_to_json(redact(Reason)))
|
?BAD_REQUEST(map_to_json(redact(Reason)))
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -43,7 +43,7 @@ connector_structs() ->
|
||||||
[
|
[
|
||||||
{kafka_producer,
|
{kafka_producer,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:map(name, ref(emqx_bridge_kafka, "config")),
|
hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")),
|
||||||
#{
|
#{
|
||||||
desc => <<"Kafka Connector Config">>,
|
desc => <<"Kafka Connector Config">>,
|
||||||
required => false
|
required => false
|
||||||
|
|
|
@ -283,13 +283,6 @@ config_enable.desc:
|
||||||
config_enable.label:
|
config_enable.label:
|
||||||
"""Enable or Disable"""
|
"""Enable or Disable"""
|
||||||
|
|
||||||
|
|
||||||
config_connector.desc:
|
|
||||||
"""Reference to connector"""
|
|
||||||
|
|
||||||
config_connector.label:
|
|
||||||
"""Connector"""
|
|
||||||
|
|
||||||
consumer_mqtt_payload.desc:
|
consumer_mqtt_payload.desc:
|
||||||
"""The template for transforming the incoming Kafka message. By default, it will use JSON format to serialize inputs from the Kafka message. Such fields are:
|
"""The template for transforming the incoming Kafka message. By default, it will use JSON format to serialize inputs from the Kafka message. Such fields are:
|
||||||
<code>headers</code>: an object containing string key-value pairs.
|
<code>headers</code>: an object containing string key-value pairs.
|
||||||
|
@ -316,10 +309,10 @@ kafka_consumer.label:
|
||||||
"""Kafka Consumer"""
|
"""Kafka Consumer"""
|
||||||
|
|
||||||
desc_config.desc:
|
desc_config.desc:
|
||||||
"""Configuration for a Kafka bridge."""
|
"""Configuration for a Kafka Producer Client."""
|
||||||
|
|
||||||
desc_config.label:
|
desc_config.label:
|
||||||
"""Kafka Bridge Configuration"""
|
"""Kafka Producer Client Configuration"""
|
||||||
|
|
||||||
consumer_value_encoding_mode.desc:
|
consumer_value_encoding_mode.desc:
|
||||||
"""Defines how the value from the Kafka message is encoded before being forwarded via MQTT.
|
"""Defines how the value from the Kafka message is encoded before being forwarded via MQTT.
|
||||||
|
|
Loading…
Reference in New Issue