Merge remote-tracking branch 'upstream/release-53' into 1114-sync-r53

This commit is contained in:
Ivan Dyachkov 2023-11-14 10:44:35 +01:00
commit 1c57993c91
35 changed files with 760 additions and 292 deletions

View File

@ -32,10 +32,10 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.3.1-alpha.1").
-define(EMQX_RELEASE_CE, "5.3.1").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.3.1-alpha.5").
-define(EMQX_RELEASE_EE, "5.3.1").
%% The HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -662,14 +662,32 @@ remove_from_override_config(_BinKeyPath, #{persistent := false}) ->
undefined;
remove_from_override_config(BinKeyPath, Opts) ->
OldConf = emqx_config:read_override_conf(Opts),
emqx_utils_maps:deep_remove(BinKeyPath, OldConf).
UpgradedOldConf = upgrade_conf(OldConf),
emqx_utils_maps:deep_remove(BinKeyPath, UpgradedOldConf).
%% apply new config on top of override config
merge_to_override_config(_RawConf, #{persistent := false}) ->
undefined;
merge_to_override_config(RawConf, Opts) ->
OldConf = emqx_config:read_override_conf(Opts),
maps:merge(OldConf, RawConf).
UpgradedOldConf = upgrade_conf(OldConf),
maps:merge(UpgradedOldConf, RawConf).
upgrade_conf(Conf) ->
try
ConfLoader = emqx_app:get_config_loader(),
SchemaModule = apply(ConfLoader, schema_module, []),
apply(SchemaModule, upgrade_raw_conf, [Conf])
catch
ErrorType:Reason:Stack ->
?SLOG(warning, #{
msg => "failed_to_upgrade_config",
error_type => ErrorType,
reason => Reason,
stacktrace => Stack
}),
Conf
end.
up_req({remove, _Opts}) -> '$remove';
up_req({{update, Req}, _Opts}) -> Req.

View File

@ -674,7 +674,16 @@ t_multi_streams_packet_malform(Config) ->
?assert(is_list(emqtt:info(C))),
{error, stm_send_error, aborted} = quicer:send(MalformStream, <<1, 2, 3, 4, 5, 6, 7, 8, 9, 0>>),
{error, stm_send_error, _} =
snabbkaffe:retry(
10000,
10,
fun() ->
{error, stm_send_error, _} = quicer:send(
MalformStream, <<1, 2, 3, 4, 5, 6, 7, 8, 9, 0>>
)
end
),
?assert(is_list(emqtt:info(C))),

View File

@ -0,0 +1,199 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The module which knows everything about actions.
%% NOTE: it does not cover the V1 bridges.
-module(emqx_action_info).
-export([
action_type_to_connector_type/1,
action_type_to_bridge_v1_type/1,
bridge_v1_type_to_action_type/1,
is_action_type/1,
registered_schema_modules/0
]).
-callback bridge_v1_type_name() -> atom().
-callback action_type_name() -> atom().
-callback connector_type_name() -> atom().
-callback schema_module() -> atom().
-optional_callbacks([bridge_v1_type_name/0]).
%% ====================================================================
%% Hadcoded list of info modules for actions
%% TODO: Remove this list once we have made sure that all relevants
%% apps are loaded before this module is called.
%% ====================================================================
-if(?EMQX_RELEASE_EDITION == ee).
hard_coded_action_info_modules_ee() ->
[
emqx_bridge_kafka_action_info,
emqx_bridge_azure_event_hub_action_info
].
-else.
hard_coded_action_info_modules_ee() ->
[].
-endif.
hard_coded_action_info_modules_common() ->
[].
hard_coded_action_info_modules() ->
hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee().
%% ====================================================================
%% API
%% ====================================================================
action_type_to_connector_type(Type) when not is_atom(Type) ->
action_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type)));
action_type_to_connector_type(Type) ->
ActionInfoMap = info_map(),
ActionTypeToConnectorTypeMap = maps:get(action_type_to_connector_type, ActionInfoMap),
case maps:get(Type, ActionTypeToConnectorTypeMap, undefined) of
undefined -> Type;
ConnectorType -> ConnectorType
end.
bridge_v1_type_to_action_type(Bin) when is_binary(Bin) ->
bridge_v1_type_to_action_type(binary_to_existing_atom(Bin));
bridge_v1_type_to_action_type(Type) ->
ActionInfoMap = info_map(),
BridgeV1TypeToActionType = maps:get(bridge_v1_type_to_action_type, ActionInfoMap),
case maps:get(Type, BridgeV1TypeToActionType, undefined) of
undefined -> Type;
ActionType -> ActionType
end.
action_type_to_bridge_v1_type(Bin) when is_binary(Bin) ->
action_type_to_bridge_v1_type(binary_to_existing_atom(Bin));
action_type_to_bridge_v1_type(Type) ->
ActionInfoMap = info_map(),
ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap),
case maps:get(Type, ActionTypeToBridgeV1Type, undefined) of
undefined -> Type;
BridgeV1Type -> BridgeV1Type
end.
%% This function should return true for all inputs that are bridge V1 types for
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
%% types. For everything else the function should return false.
is_action_type(Bin) when is_binary(Bin) ->
is_action_type(binary_to_existing_atom(Bin));
is_action_type(Type) ->
ActionInfoMap = info_map(),
ActionTypes = maps:get(action_type_names, ActionInfoMap),
case maps:get(Type, ActionTypes, undefined) of
undefined -> false;
_ -> true
end.
registered_schema_modules() ->
InfoMap = info_map(),
Schemas = maps:get(action_type_to_schema_module, InfoMap),
maps:to_list(Schemas).
%% ====================================================================
%% Internal functions for building the info map and accessing it
%% ====================================================================
internal_emqx_action_persistent_term_info_key() ->
?FUNCTION_NAME.
info_map() ->
case persistent_term:get(internal_emqx_action_persistent_term_info_key(), not_found) of
not_found ->
build_cache();
ActionInfoMap ->
ActionInfoMap
end.
build_cache() ->
ActionInfoModules = action_info_modules(),
ActionInfoMap =
lists:foldl(
fun(Module, InfoMapSoFar) ->
ModuleInfoMap = get_info_map(Module),
emqx_utils_maps:deep_merge(InfoMapSoFar, ModuleInfoMap)
end,
initial_info_map(),
ActionInfoModules
),
%% Update the persistent term with the new info map
persistent_term:put(internal_emqx_action_persistent_term_info_key(), ActionInfoMap),
ActionInfoMap.
action_info_modules() ->
ActionInfoModules = [
action_info_modules(App)
|| {App, _, _} <- application:loaded_applications()
],
lists:usort(lists:flatten(ActionInfoModules) ++ hard_coded_action_info_modules()).
action_info_modules(App) ->
case application:get_env(App, emqx_action_info_module) of
{ok, Module} ->
[Module];
_ ->
[]
end.
initial_info_map() ->
#{
action_type_names => #{},
bridge_v1_type_to_action_type => #{},
action_type_to_bridge_v1_type => #{},
action_type_to_connector_type => #{},
action_type_to_schema_module => #{}
}.
get_info_map(Module) ->
%% Force the module to get loaded
_ = code:ensure_loaded(Module),
ActionType = Module:action_type_name(),
BridgeV1Type =
case erlang:function_exported(Module, bridge_v1_type_name, 0) of
true ->
Module:bridge_v1_type_name();
false ->
Module:action_type_name()
end,
#{
action_type_names => #{
ActionType => true,
BridgeV1Type => true
},
bridge_v1_type_to_action_type => #{
BridgeV1Type => ActionType,
%% Alias the bridge V1 type to the action type
ActionType => ActionType
},
action_type_to_bridge_v1_type => #{
ActionType => BridgeV1Type
},
action_type_to_connector_type => #{
ActionType => Module:connector_type_name(),
%% Alias the bridge V1 type to the action type
BridgeV1Type => Module:connector_type_name()
},
action_type_to_schema_module => #{
ActionType => Module:schema_module()
}
}.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge, [
{description, "EMQX bridges"},
{vsn, "0.1.29"},
{vsn, "0.1.30"},
{registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}},
{applications, [

View File

@ -307,7 +307,7 @@ list() ->
emqx:get_raw_config([bridges], #{})
),
BridgeV2Bridges =
emqx_bridge_v2:list_and_transform_to_bridge_v1(),
emqx_bridge_v2:bridge_v1_list_and_transform(),
BridgeV1Bridges ++ BridgeV2Bridges.
%%BridgeV2Bridges = emqx_bridge_v2:list().
@ -318,7 +318,7 @@ lookup(Id) ->
lookup(Type, Name) ->
case emqx_bridge_v2:is_bridge_v2_type(Type) of
true ->
emqx_bridge_v2:lookup_and_transform_to_bridge_v1(Type, Name);
emqx_bridge_v2:bridge_v1_lookup_and_transform(Type, Name);
false ->
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
lookup(Type, Name, RawConf)
@ -340,7 +340,7 @@ lookup(Type, Name, RawConf) ->
get_metrics(Type, Name) ->
case emqx_bridge_v2:is_bridge_v2_type(Type) of
true ->
case emqx_bridge_v2:is_valid_bridge_v1(Type, Name) of
case emqx_bridge_v2:bridge_v1_is_valid(Type, Name) of
true ->
BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type),
emqx_bridge_v2:get_metrics(BridgeV2Type, Name);
@ -383,7 +383,7 @@ create(BridgeType0, BridgeName, RawConf) ->
}),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_bridge_v2:split_bridge_v1_config_and_create(BridgeType, BridgeName, RawConf);
emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeType, BridgeName, RawConf);
false ->
emqx_conf:update(
emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],

View File

@ -627,7 +627,7 @@ create_bridge(BridgeType, BridgeName, Conf) ->
update_bridge(BridgeType, BridgeName, Conf) ->
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
case emqx_bridge_v2:is_valid_bridge_v1(BridgeType, BridgeName) of
case emqx_bridge_v2:bridge_v1_is_valid(BridgeType, BridgeName) of
true ->
create_or_update_bridge(BridgeType, BridgeName, Conf, 200);
false ->
@ -1157,7 +1157,7 @@ map_to_json(M0) ->
end.
non_compat_bridge_msg() ->
<<"bridge already exists as non Bridge V1 compatible Bridge V2 bridge">>.
<<"bridge already exists as non Bridge V1 compatible action">>.
upgrade_type(Type) ->
emqx_bridge_lib:upgrade_type(Type).

View File

@ -53,20 +53,20 @@ maybe_withdraw_rule_action_loop([BridgeId | More], DeleteActions) ->
end.
%% @doc Kafka producer bridge renamed from 'kafka' to 'kafka_bridge' since 5.3.1.
upgrade_type(kafka) ->
kafka_producer;
upgrade_type(<<"kafka">>) ->
<<"kafka_producer">>;
upgrade_type(Other) ->
Other.
upgrade_type(Type) when is_atom(Type) ->
emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type);
upgrade_type(Type) when is_binary(Type) ->
atom_to_binary(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type));
upgrade_type(Type) when is_list(Type) ->
atom_to_list(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(list_to_binary(Type))).
%% @doc Kafka producer bridge type renamed from 'kafka' to 'kafka_bridge' since 5.3.1
downgrade_type(kafka_producer) ->
kafka;
downgrade_type(<<"kafka_producer">>) ->
<<"kafka">>;
downgrade_type(Other) ->
Other.
downgrade_type(Type) when is_atom(Type) ->
emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type);
downgrade_type(Type) when is_binary(Type) ->
atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type));
downgrade_type(Type) when is_list(Type) ->
atom_to_list(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(list_to_binary(Type))).
%% A rule might be referencing an old version bridge type name
%% i.e. 'kafka' instead of 'kafka_producer' so we need to try both

View File

@ -130,7 +130,7 @@ reset_metrics(ResourceId) ->
false ->
emqx_resource:reset_metrics(ResourceId);
true ->
case emqx_bridge_v2:is_valid_bridge_v1(Type, Name) of
case emqx_bridge_v2:bridge_v1_is_valid(Type, Name) of
true ->
BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type),
emqx_bridge_v2:reset_metrics(BridgeV2Type, Name);

View File

@ -40,6 +40,8 @@
list/0,
lookup/2,
create/3,
%% The remove/2 function is only for internal use as it may create
%% rules with broken dependencies
remove/2,
%% The following is the remove function that is called by the HTTP API
%% It also checks for rule action dependencies and optionally removes
@ -48,6 +50,7 @@
]).
%% Operations
-export([
disable_enable/3,
health_check/2,
@ -73,7 +76,8 @@
-export([
id/2,
id/3,
is_valid_bridge_v1/2
bridge_v1_is_valid/2,
extract_connector_id_from_bridge_v2_id/1
]).
%% Config Update Handler API
@ -88,18 +92,26 @@
import_config/1
]).
%% Compatibility API
%% Bridge V2 Types and Conversions
-export([
bridge_v2_type_to_connector_type/1,
is_bridge_v2_type/1,
lookup_and_transform_to_bridge_v1/2,
list_and_transform_to_bridge_v1/0,
is_bridge_v2_type/1
]).
%% Compatibility Layer API
%% All public functions for the compatibility layer should be prefixed with
%% bridge_v1_
-export([
bridge_v1_lookup_and_transform/2,
bridge_v1_list_and_transform/0,
bridge_v1_check_deps_and_remove/3,
split_bridge_v1_config_and_create/3,
bridge_v1_split_config_and_create/3,
bridge_v1_create_dry_run/2,
extract_connector_id_from_bridge_v2_id/1,
bridge_v1_type_to_bridge_v2_type/1,
%% Exception from the naming convention:
bridge_v2_type_to_bridge_v1_type/1,
bridge_v1_id_to_connector_resource_id/1,
bridge_v1_enable_disable/3,
bridge_v1_restart/2,
@ -107,6 +119,27 @@
bridge_v1_start/2
]).
%%====================================================================
%% Types
%%====================================================================
-type bridge_v2_info() :: #{
type := binary(),
name := binary(),
raw_config := map(),
resource_data := map(),
status := emqx_resource:resource_status(),
%% Explanation of the status if the status is not connected
error := term()
}.
-type bridge_v2_type() :: binary() | atom() | [byte()].
-type bridge_v2_name() :: binary() | atom() | [byte()].
%%====================================================================
%%====================================================================
%%====================================================================
%% Loading and unloading config when EMQX starts and stops
%%====================================================================
@ -157,6 +190,7 @@ unload_bridges() ->
%% CRUD API
%%====================================================================
-spec lookup(bridge_v2_type(), bridge_v2_name()) -> {ok, bridge_v2_info()} | {error, not_found}.
lookup(Type, Name) ->
case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of
not_found ->
@ -191,8 +225,8 @@ lookup(Type, Name) ->
{disconnected, <<"Pending installation">>}
end,
{ok, #{
type => Type,
name => Name,
type => bin(Type),
name => bin(Name),
raw_config => RawConf,
resource_data => InstanceData,
status => DisplayBridgeV2Status,
@ -200,9 +234,12 @@ lookup(Type, Name) ->
}}
end.
-spec list() -> [bridge_v2_info()] | {error, term()}.
list() ->
list_with_lookup_fun(fun lookup/2).
-spec create(bridge_v2_type(), bridge_v2_name(), map()) ->
{ok, emqx_config:update_result()} | {error, any()}.
create(BridgeType, BridgeName, RawConf) ->
?SLOG(debug, #{
brige_action => create,
@ -217,9 +254,10 @@ create(BridgeType, BridgeName, RawConf) ->
#{override_to => cluster}
).
%% NOTE: This function can cause broken references but it is only called from
%% test cases.
-spec remove(atom() | binary(), binary()) -> ok | {error, any()}.
%% NOTE: This function can cause broken references from rules but it is only
%% called directly from test cases.
-spec remove(bridge_v2_type(), bridge_v2_name()) -> ok | {error, any()}.
remove(BridgeType, BridgeName) ->
?SLOG(debug, #{
brige_action => remove,
@ -237,6 +275,7 @@ remove(BridgeType, BridgeName) ->
{error, Reason} -> {error, Reason}
end.
-spec check_deps_and_remove(bridge_v2_type(), bridge_v2_name(), boolean()) -> ok | {error, any()}.
check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActions) ->
AlsoDelete =
case AlsoDeleteActions of
@ -360,28 +399,6 @@ uninstall_bridge_v2(
%% Already not installed
ok;
uninstall_bridge_v2(
BridgeV2Type,
BridgeName,
Config
) ->
uninstall_bridge_v2_helper(
BridgeV2Type,
BridgeName,
combine_connector_and_bridge_v2_config(
BridgeV2Type,
BridgeName,
Config
)
).
uninstall_bridge_v2_helper(
_BridgeV2Type,
_BridgeName,
{error, Reason} = Error
) ->
?SLOG(error, Reason),
Error;
uninstall_bridge_v2_helper(
BridgeV2Type,
BridgeName,
#{connector := ConnectorName} = Config
@ -390,11 +407,16 @@ uninstall_bridge_v2_helper(
CreationOpts = emqx_resource:fetch_creation_opts(Config),
ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts),
ok = emqx_resource:clear_metrics(BridgeV2Id),
%% Deinstall from connector
ConnectorId = emqx_connector_resource:resource_id(
connector_type(BridgeV2Type), ConnectorName
),
emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id).
case combine_connector_and_bridge_v2_config(BridgeV2Type, BridgeName, Config) of
{error, _} ->
ok;
_CombinedConfig ->
%% Deinstall from connector
ConnectorId = emqx_connector_resource:resource_id(
connector_type(BridgeV2Type), ConnectorName
),
emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id)
end.
combine_connector_and_bridge_v2_config(
BridgeV2Type,
@ -425,6 +447,8 @@ combine_connector_and_bridge_v2_config(
%% Operations
%%====================================================================
-spec disable_enable(disable | enable, bridge_v2_type(), bridge_v2_name()) ->
{ok, any()} | {error, any()}.
disable_enable(Action, BridgeType, BridgeName) when
Action =:= disable; Action =:= enable
->
@ -502,6 +526,7 @@ connector_operation_helper_with_conf(
end
end.
-spec reset_metrics(bridge_v2_type(), bridge_v2_name()) -> ok | {error, not_found}.
reset_metrics(Type, Name) ->
reset_metrics_helper(Type, Name, lookup_conf(Type, Name)).
@ -509,7 +534,9 @@ reset_metrics_helper(_Type, _Name, #{enable := false}) ->
ok;
reset_metrics_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) ->
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id).
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id);
reset_metrics_helper(_, _, _) ->
{error, not_found}.
get_query_mode(BridgeV2Type, Config) ->
CreationOpts = emqx_resource:fetch_creation_opts(Config),
@ -517,6 +544,8 @@ get_query_mode(BridgeV2Type, Config) ->
ResourceType = emqx_connector_resource:connector_to_resource_type(ConnectorType),
emqx_resource:query_mode(ResourceType, Config, CreationOpts).
-spec send_message(bridge_v2_type(), bridge_v2_name(), Message :: term(), QueryOpts :: map()) ->
term() | {error, term()}.
send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
case lookup_conf(BridgeType, BridgeName) of
#{enable := true} = Config0 ->
@ -550,8 +579,7 @@ do_send_msg_with_enabled_config(
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
-spec health_check(BridgeType :: term(), BridgeName :: term()) ->
#{status := term(), error := term()} | {error, Reason :: term()}.
#{status := emqx_resource:resource_status(), error := term()} | {error, Reason :: term()}.
health_check(BridgeType, BridgeName) ->
case lookup_conf(BridgeType, BridgeName) of
#{
@ -570,6 +598,34 @@ health_check(BridgeType, BridgeName) ->
Error
end.
-spec create_dry_run(bridge_v2_type(), Config :: map()) -> ok | {error, term()}.
create_dry_run(Type, Conf0) ->
Conf1 = maps:without([<<"name">>], Conf0),
TypeBin = bin(Type),
RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
%% Check config
try
_ =
hocon_tconf:check_plain(
emqx_bridge_v2_schema,
RawConf,
#{atom_key => true, required => false}
),
#{<<"connector">> := ConnectorName} = Conf1,
%% Check that the connector exists and do the dry run if it exists
ConnectorType = connector_type(Type),
case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of
not_found ->
{error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))};
ConnectorRawConf ->
create_dry_run_helper(Type, ConnectorRawConf, Conf1)
end
catch
%% validation errors
throw:Reason1 ->
{error, Reason1}
end.
create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
ConnectorType = connector_type(BridgeType),
@ -601,33 +657,7 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
end,
emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback).
create_dry_run(Type, Conf0) ->
Conf1 = maps:without([<<"name">>], Conf0),
TypeBin = bin(Type),
RawConf = #{<<"actions">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
%% Check config
try
_ =
hocon_tconf:check_plain(
emqx_bridge_v2_schema,
RawConf,
#{atom_key => true, required => false}
),
#{<<"connector">> := ConnectorName} = Conf1,
%% Check that the connector exists and do the dry run if it exists
ConnectorType = connector_type(Type),
case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of
not_found ->
{error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))};
ConnectorRawConf ->
create_dry_run_helper(Type, ConnectorRawConf, Conf1)
end
catch
%% validation errors
throw:Reason1 ->
{error, Reason1}
end.
-spec get_metrics(bridge_v2_type(), bridge_v2_name()) -> emqx_metrics_worker:metrics().
get_metrics(Type, Name) ->
emqx_resource:get_metrics(id(Type, Name)).
@ -796,15 +826,8 @@ connector_type(Type) ->
%% remote call so it can be mocked
?MODULE:bridge_v2_type_to_connector_type(Type).
bridge_v2_type_to_connector_type(Type) when not is_atom(Type) ->
bridge_v2_type_to_connector_type(binary_to_existing_atom(iolist_to_binary(Type)));
bridge_v2_type_to_connector_type(kafka) ->
%% backward compatible
kafka_producer;
bridge_v2_type_to_connector_type(kafka_producer) ->
kafka_producer;
bridge_v2_type_to_connector_type(azure_event_hub_producer) ->
azure_event_hub_producer.
bridge_v2_type_to_connector_type(Type) ->
emqx_action_info:action_type_to_connector_type(Type).
%%====================================================================
%% Data backup API
@ -1006,7 +1029,7 @@ unpack_bridge_conf(Type, PackedConf, TopLevelConf) ->
%%
%% * The corresponding bridge v2 should exist
%% * The connector for the bridge v2 should have exactly one channel
is_valid_bridge_v1(BridgeV1Type, BridgeName) ->
bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
case lookup_conf(BridgeV2Type, BridgeName) of
{error, _} ->
@ -1024,35 +1047,21 @@ is_valid_bridge_v1(BridgeV1Type, BridgeName) ->
end
end.
bridge_v1_type_to_bridge_v2_type(Bin) when is_binary(Bin) ->
?MODULE:bridge_v1_type_to_bridge_v2_type(binary_to_existing_atom(Bin));
bridge_v1_type_to_bridge_v2_type(kafka) ->
kafka_producer;
bridge_v1_type_to_bridge_v2_type(kafka_producer) ->
kafka_producer;
bridge_v1_type_to_bridge_v2_type(azure_event_hub_producer) ->
azure_event_hub_producer.
bridge_v1_type_to_bridge_v2_type(Type) ->
emqx_action_info:bridge_v1_type_to_action_type(Type).
%% This function should return true for all inputs that are bridge V1 types for
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
%% types. For everything else the function should return false.
is_bridge_v2_type(Atom) when is_atom(Atom) ->
is_bridge_v2_type(atom_to_binary(Atom, utf8));
is_bridge_v2_type(<<"kafka_producer">>) ->
true;
is_bridge_v2_type(<<"kafka">>) ->
true;
is_bridge_v2_type(<<"azure_event_hub_producer">>) ->
true;
is_bridge_v2_type(_) ->
false.
bridge_v2_type_to_bridge_v1_type(Type) ->
emqx_action_info:action_type_to_bridge_v1_type(Type).
list_and_transform_to_bridge_v1() ->
Bridges = list_with_lookup_fun(fun lookup_and_transform_to_bridge_v1/2),
is_bridge_v2_type(Type) ->
emqx_action_info:is_action_type(Type).
bridge_v1_list_and_transform() ->
Bridges = list_with_lookup_fun(fun bridge_v1_lookup_and_transform/2),
[B || B <- Bridges, B =/= not_bridge_v1_compatible_error()].
lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) ->
case ?MODULE:is_valid_bridge_v1(BridgeV1Type, Name) of
bridge_v1_lookup_and_transform(BridgeV1Type, Name) ->
case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of
true ->
Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
case lookup(Type, Name) of
@ -1060,7 +1069,7 @@ lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) ->
ConnectorType = connector_type(Type),
case emqx_connector:lookup(ConnectorType, ConnectorName) of
{ok, Connector} ->
lookup_and_transform_to_bridge_v1_helper(
bridge_v1_lookup_and_transform_helper(
BridgeV1Type, Name, Type, BridgeV2, ConnectorType, Connector
);
Error ->
@ -1076,7 +1085,7 @@ lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) ->
not_bridge_v1_compatible_error() ->
{error, not_bridge_v1_compatible}.
lookup_and_transform_to_bridge_v1_helper(
bridge_v1_lookup_and_transform_helper(
BridgeV1Type, BridgeName, BridgeV2Type, BridgeV2, ConnectorType, Connector
) ->
ConnectorRawConfig1 = maps:get(raw_config, Connector),
@ -1129,7 +1138,7 @@ lookup_conf(Type, Name) ->
Config
end.
split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
%% Check if the bridge v2 exists
case lookup_conf(BridgeV2Type, BridgeName) of
@ -1140,7 +1149,7 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
BridgeV1Type, BridgeName, RawConf, PreviousRawConf
);
_Conf ->
case ?MODULE:is_valid_bridge_v1(BridgeV1Type, BridgeName) of
case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of
true ->
%% Using remove + create as update, hence do not delete deps.
RemoveDeps = [],
@ -1375,7 +1384,7 @@ bridge_v1_id_to_connector_resource_id(BridgeId) ->
end.
bridge_v1_enable_disable(Action, BridgeType, BridgeName) ->
case emqx_bridge_v2:is_valid_bridge_v1(BridgeType, BridgeName) of
case emqx_bridge_v2:bridge_v1_is_valid(BridgeType, BridgeName) of
true ->
bridge_v1_enable_disable_helper(
Action,
@ -1420,7 +1429,7 @@ bridge_v1_start(BridgeV1Type, Name) ->
bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
case emqx_bridge_v2:is_valid_bridge_v1(BridgeV1Type, Name) of
case emqx_bridge_v2:bridge_v1_is_valid(BridgeV1Type, Name) of
true ->
connector_operation_helper_with_conf(
BridgeV2Type,

View File

@ -98,21 +98,11 @@ get_response_body_schema() ->
).
bridge_info_examples(Method) ->
maps:merge(
#{},
emqx_enterprise_bridge_examples(Method)
).
emqx_bridge_v2_schema:examples(Method).
bridge_info_array_example(Method) ->
lists:map(fun(#{value := Config}) -> Config end, maps:values(bridge_info_examples(Method))).
-if(?EMQX_RELEASE_EDITION == ee).
emqx_enterprise_bridge_examples(Method) ->
emqx_bridge_v2_enterprise:examples(Method).
-else.
emqx_enterprise_bridge_examples(_Method) -> #{}.
-endif.
param_path_id() ->
{id,
mk(
@ -718,7 +708,13 @@ node_status(Bridges) ->
aggregate_status(AllStatus) ->
Head = fun([A | _]) -> A end,
HeadVal = maps:get(status, Head(AllStatus), connecting),
AllRes = lists:all(fun(#{status := Val}) -> Val == HeadVal end, AllStatus),
AllRes = lists:all(
fun
(#{status := Val}) -> Val == HeadVal;
(_) -> false
end,
AllStatus
),
case AllRes of
true -> HeadVal;
false -> inconsistent
@ -795,8 +791,6 @@ do_create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
PreOrPostConfigUpdate =:= pre_config_update;
PreOrPostConfigUpdate =:= post_config_update
->
?BAD_REQUEST(map_to_json(redact(Reason)));
{error, Reason} ->
?BAD_REQUEST(map_to_json(redact(Reason)))
end.

View File

@ -82,6 +82,11 @@ schema_modules() ->
].
examples(Method) ->
ActionExamples = emqx_bridge_v2_schema:examples(Method),
RegisteredExamples = registered_examples(Method),
maps:merge(ActionExamples, RegisteredExamples).
registered_examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)

View File

@ -1,68 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_v2_enterprise).
-if(?EMQX_RELEASE_EDITION == ee).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
api_schemas/1,
examples/1,
fields/1
]).
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
lists:foldl(Fun, #{}, schema_modules()).
schema_modules() ->
[
emqx_bridge_kafka,
emqx_bridge_azure_event_hub
].
fields(actions) ->
action_structs().
action_structs() ->
[
{kafka_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)),
#{
desc => <<"Kafka Producer Actions Config">>,
required => false
}
)},
{azure_event_hub_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_azure_event_hub, actions)),
#{
desc => <<"Azure Event Hub Actions Config">>,
required => false
}
)}
].
api_schemas(Method) ->
[
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_bridge_v2"),
api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2")
].
api_ref(Module, Type, Method) ->
{Type, ref(Module, Method)}.
-else.
-endif.

View File

@ -27,51 +27,24 @@
-export([
get_response/0,
put_request/0,
post_request/0
post_request/0,
examples/1
]).
%% Exported for mocking
%% TODO: refactor emqx_bridge_v1_compatibility_layer_SUITE so we don't need to
%% export this
-export([
registered_api_schemas/1
]).
-export([types/0, types_sc/0]).
-export([enterprise_api_schemas/1]).
-export_type([action_type/0]).
%% Should we explicitly list them here so dialyzer may be more helpful?
-type action_type() :: atom().
-if(?EMQX_RELEASE_EDITION == ee).
-spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when
Method :: string().
enterprise_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use
%% `call_hocon' from `nodetool' to generate initial configurations.
_ = emqx_bridge_v2_enterprise:module_info(),
case erlang:function_exported(emqx_bridge_v2_enterprise, api_schemas, 1) of
true -> emqx_bridge_v2_enterprise:api_schemas(Method);
false -> []
end.
enterprise_fields_actions() ->
%% We *must* do this to ensure the module is really loaded, especially when we use
%% `call_hocon' from `nodetool' to generate initial configurations.
_ = emqx_bridge_v2_enterprise:module_info(),
case erlang:function_exported(emqx_bridge_v2_enterprise, fields, 1) of
true ->
emqx_bridge_v2_enterprise:fields(actions);
false ->
[]
end.
-else.
-spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when
Method :: string().
enterprise_api_schemas(_Method) -> [].
enterprise_fields_actions() -> [].
-endif.
%%======================================================================================
%% For HTTP APIs
get_response() ->
@ -84,8 +57,18 @@ post_request() ->
api_schema("post").
api_schema(Method) ->
EE = ?MODULE:enterprise_api_schemas(Method),
hoconsc:union(bridge_api_union(EE)).
APISchemas = ?MODULE:registered_api_schemas(Method),
hoconsc:union(bridge_api_union(APISchemas)).
registered_api_schemas(Method) ->
RegisteredSchemas = emqx_action_info:registered_schema_modules(),
[
api_ref(SchemaModule, atom_to_binary(BridgeV2Type), Method ++ "_bridge_v2")
|| {BridgeV2Type, SchemaModule} <- RegisteredSchemas
].
api_ref(Module, Type, Method) ->
{Type, ref(Module, Method)}.
bridge_api_union(Refs) ->
Index = maps:from_list(Refs),
@ -133,7 +116,13 @@ roots() ->
end.
fields(actions) ->
[] ++ enterprise_fields_actions().
registered_schema_fields().
registered_schema_fields() ->
[
Module:fields(action)
|| {_BridgeV2Type, Module} <- emqx_action_info:registered_schema_modules()
].
desc(actions) ->
?DESC("desc_bridges_v2");
@ -148,6 +137,19 @@ types() ->
types_sc() ->
hoconsc:enum(types()).
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules()],
lists:foldl(Fun, #{}, SchemaModules).
-ifdef(TEST).
-include_lib("hocon/include/hocon_types.hrl").
schema_homogeneous_test() ->

View File

@ -111,7 +111,7 @@ setup_mocks() ->
catch meck:new(emqx_bridge_v2_schema, MeckOpts),
meck:expect(
emqx_bridge_v2_schema,
enterprise_api_schemas,
registered_api_schemas,
1,
fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end
),

View File

@ -264,17 +264,17 @@ t_create_dry_run_connector_does_not_exist(_) ->
BridgeConf = (bridge_config())#{<<"connector">> => <<"connector_does_not_exist">>},
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), BridgeConf).
t_is_valid_bridge_v1(_) ->
t_bridge_v1_is_valid(_) ->
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge),
%% Add another channel/bridge to the connector
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge_2, bridge_config()),
false = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
false = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge_2),
true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge_2),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2),
%% Non existing bridge is a valid Bridge V1
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge),
ok.
t_manual_health_check(_) ->
@ -647,10 +647,12 @@ t_load_config_success(_Config) ->
{ok, _},
update_root_config(RootConf0)
),
BridgeTypeBin = bin(BridgeType),
BridgeNameBin = bin(BridgeName),
?assertMatch(
{ok, #{
type := BridgeType,
name := BridgeName,
type := BridgeTypeBin,
name := BridgeNameBin,
raw_config := #{},
resource_data := #{}
}},
@ -665,8 +667,8 @@ t_load_config_success(_Config) ->
),
?assertMatch(
{ok, #{
type := BridgeType,
name := BridgeName,
type := BridgeTypeBin,
name := BridgeNameBin,
raw_config := #{<<"some_key">> := <<"new_value">>},
resource_data := #{}
}},
@ -860,3 +862,7 @@ wait_until(Fun, Timeout) when Timeout >= 0 ->
end;
wait_until(_, _) ->
ct:fail("Wait until event did not happen").
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).

View File

@ -177,7 +177,9 @@ all() ->
groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
SingleOnlyTests = [
t_bridges_probe
t_bridges_probe,
t_broken_bridge_config,
t_fix_broken_bridge_config
],
ClusterLaterJoinOnlyTCs = [
% t_cluster_later_join_metrics
@ -551,6 +553,117 @@ t_bridges_lifecycle(Config) ->
{ok, 400, _} = request(post, uri([?ROOT]), ?KAFKA_BRIDGE(<<"a.b">>), Config),
ok.
t_broken_bridge_config(Config) ->
emqx_cth_suite:stop_apps([emqx_bridge]),
BridgeName = ?BRIDGE_NAME,
StartOps =
#{
config =>
"actions {\n"
" "
?BRIDGE_TYPE_STR
" {\n"
" " ++ binary_to_list(BridgeName) ++
" {\n"
" connector = does_not_exist\n"
" enable = true\n"
" kafka {\n"
" topic = test-topic-one-partition\n"
" }\n"
" local_topic = \"mqtt/local/topic\"\n"
" resource_opts {health_check_interval = 32s}\n"
" }\n"
" }\n"
"}\n"
"\n",
schema_mod => emqx_bridge_v2_schema
},
emqx_cth_suite:start_app(emqx_bridge, StartOps),
?assertMatch(
{ok, 200, [
#{
<<"name">> := BridgeName,
<<"type">> := ?BRIDGE_TYPE,
<<"connector">> := <<"does_not_exist">>,
<<"status">> := <<"disconnected">>,
<<"error">> := <<"Pending installation">>
}
]},
request_json(get, uri([?ROOT]), Config)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
?assertEqual(
{ok, 204, <<>>},
request(delete, uri([?ROOT, BridgeID]), Config)
),
?assertEqual(
{ok, 200, []},
request_json(get, uri([?ROOT]), Config)
),
ok.
t_fix_broken_bridge_config(Config) ->
emqx_cth_suite:stop_apps([emqx_bridge]),
BridgeName = ?BRIDGE_NAME,
StartOps =
#{
config =>
"actions {\n"
" "
?BRIDGE_TYPE_STR
" {\n"
" " ++ binary_to_list(BridgeName) ++
" {\n"
" connector = does_not_exist\n"
" enable = true\n"
" kafka {\n"
" topic = test-topic-one-partition\n"
" }\n"
" local_topic = \"mqtt/local/topic\"\n"
" resource_opts {health_check_interval = 32s}\n"
" }\n"
" }\n"
"}\n"
"\n",
schema_mod => emqx_bridge_v2_schema
},
emqx_cth_suite:start_app(emqx_bridge, StartOps),
?assertMatch(
{ok, 200, [
#{
<<"name">> := BridgeName,
<<"type">> := ?BRIDGE_TYPE,
<<"connector">> := <<"does_not_exist">>,
<<"status">> := <<"disconnected">>,
<<"error">> := <<"Pending installation">>
}
]},
request_json(get, uri([?ROOT]), Config)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
request_json(
put,
uri([?ROOT, BridgeID]),
?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, ?CONNECTOR_NAME),
Config
),
?assertMatch(
{ok, 200, #{
<<"connector">> := ?CONNECTOR_NAME,
<<"status">> := <<"connected">>
}},
request_json(get, uri([?ROOT, BridgeID]), Config)
),
ok.
t_start_bridge_unknown_node(Config) ->
{ok, 404, _} =
request(

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_azure_event_hub, [
{description, "EMQX Enterprise Azure Event Hub Bridge"},
{vsn, "0.1.3"},
{vsn, "0.1.4"},
{registered, []},
{applications, [
kernel,

View File

@ -114,6 +114,15 @@ fields(kafka_message) ->
Fields0 = emqx_bridge_kafka:fields(kafka_message),
Fields = proplists:delete(timestamp, Fields0),
override_documentations(Fields);
fields(action) ->
{azure_event_hub_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_azure_event_hub, actions)),
#{
desc => <<"Azure Event Hub Actions Config">>,
required => false
}
)};
fields(actions) ->
Fields =
override(
@ -162,7 +171,7 @@ bridge_v2_examples(Method) ->
[
#{
?AEH_CONNECTOR_TYPE_BIN => #{
summary => <<"Azure Event Hub Bridge v2">>,
summary => <<"Azure Event Hub Action">>,
value => values({Method, bridge_v2})
}
}
@ -207,7 +216,7 @@ values({post, bridge_v2}) ->
#{
enable => true,
connector => <<"my_azure_event_hub_producer_connector">>,
name => <<"my_azure_event_hub_producer_bridge">>,
name => <<"my_azure_event_hub_producer_action">>,
type => ?AEH_CONNECTOR_TYPE_BIN
}
);

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_event_hub_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> azure_event_hub_producer.
action_type_name() -> azure_event_hub_producer.
connector_type_name() -> azure_event_hub_producer.
schema_module() -> emqx_bridge_azure_event_hub.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.1.11"},
{vsn, "0.1.12"},
{registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [
kernel,
@ -12,7 +12,7 @@
brod,
brod_gssapi
]},
{env, []},
{env, [{emqx_action_info_module, emqx_bridge_kafka_action_info}]},
{modules, []},
{links, []}

View File

@ -100,7 +100,7 @@ values({post, connector}) ->
values({post, KafkaType}) ->
maps:merge(
#{
name => <<"my_kafka_producer_bridge">>,
name => <<"my_kafka_producer_action">>,
type => <<"kafka_producer">>
},
values({put, KafkaType})
@ -526,7 +526,18 @@ fields(consumer_kafka_opts) ->
fields(resource_opts) ->
SupportedFields = [health_check_interval],
CreationOpts = emqx_resource_schema:create_opts(_Overrides = []),
lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts).
lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts);
fields(action_field) ->
{kafka_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)),
#{
desc => <<"Kafka Producer Action Config">>,
required => false
}
)};
fields(action) ->
fields(action_field).
desc("config_connector") ->
?DESC("desc_config");

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> kafka.
action_type_name() -> kafka_producer.
connector_type_name() -> kafka_producer.
schema_module() -> emqx_bridge_kafka.

View File

@ -1,6 +1,6 @@
{application, emqx_conf, [
{description, "EMQX configuration management"},
{vsn, "0.1.30"},
{vsn, "0.1.31"},
{registered, []},
{mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib, emqx_ctl]},

View File

@ -151,6 +151,9 @@ reset(Node, KeyPath, Opts) ->
%% @doc Called from build script.
%% TODO: move to a external escript after all refactoring is done
dump_schema(Dir, SchemaModule) ->
%% TODO: Load all apps instead of only emqx_dashboard
%% as this will help schemas that searches for apps with
%% relevant schema definitions
_ = application:load(emqx_dashboard),
ok = emqx_dashboard_desc_cache:init(),
lists:foreach(

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_resource, [
{description, "Manager for all external resources"},
{vsn, "0.1.24"},
{vsn, "0.1.25"},
{registered, []},
{mod, {emqx_resource_app, []}},
{applications, [

View File

@ -447,7 +447,7 @@ health_check(ResId) ->
emqx_resource_manager:health_check(ResId).
-spec channel_health_check(resource_id(), channel_id()) ->
#{status := channel_status(), error := term(), any() => any()}.
#{status := resource_status(), error := term()}.
channel_health_check(ResId, ChannelId) ->
emqx_resource_manager:channel_health_check(ResId, ChannelId).

View File

@ -309,7 +309,7 @@ health_check(ResId) ->
safe_call(ResId, health_check, ?T_OPERATION).
-spec channel_health_check(resource_id(), channel_id()) ->
#{status := channel_status(), error := term(), any() => any()}.
#{status := resource_status(), error := term()}.
channel_health_check(ResId, ChannelId) ->
%% Do normal health check first to trigger health checks for channels
%% and update the cached health status for the channels

View File

@ -311,6 +311,15 @@ t_rule_engine(_) ->
{400, _} = emqx_rule_engine_api:'/rule_engine'(put, #{body => #{<<"something">> => <<"weird">>}}).
t_downgrade_bridge_type(_) ->
case emqx_release:edition() of
ee ->
do_test_downgrade_bridge_type();
ce ->
%% downgrade is not supported in CE
ok
end.
do_test_downgrade_bridge_type() ->
#{id := RuleId} = create_rule((?SIMPLE_RULE(<<>>))#{<<"actions">> => [<<"kafka:name">>]}),
?assertMatch(
%% returns a bridges_v2 ID

58
changes/e5.3.1.en.md Normal file
View File

@ -0,0 +1,58 @@
# e5.3.1
## Enhancements
- [#11637](https://github.com/emqx/emqx/pull/11637) Added extra diagnostic checks to help debug issues when mnesia is stuck waiting for tables. Library Updates: `ekka` has been upgraded to version 0.15.15, and `mria` to version 0.6.4.
- [#11581](https://github.com/emqx/emqx/pull/11581) Feature Preview: Planned for EMQX v5.4.0, introducing the concepts of *Connector* and *Action* base on data bridge. The existing data bridge will be gradually migrated to Connector and Action. Connector are designed to manage the integration with external systems, while Actions are solely used to configure the data processing methods. Connector can be reused across multiple Actions, providing greater flexibility and scalability. Currently, the migration has been completed for Kafka producer and Azure Event Hub producer.
- The Dashboard now supports MQTT 5.0 publish attribute settings for the rule engine's message republish action, allowing users more flexibility in publishing messages.
## Bug Fixes
- [#11565](https://github.com/emqx/emqx/pull/11565) Upgraded jq library from v0.3.10 to v0.3.11. In this version, jq_port programs are initiated on-demand and will not appear in users' processes unless the jq function in EMQX is used. Additionally, idle jq_port programs will auto-terminate after a set period. Note: Most EMQX users are running jq in NIF mode and will not be affected by this update.
- [#11676](https://github.com/emqx/emqx/pull/11676) Hid a few pieces of sensitive information from debug-level logs.
- [#11697](https://github.com/emqx/emqx/pull/11697) Disabled outdated TLS versions and cipher suites in the EMQX backplane network (`gen_rpc`). Added support for tlsv1.3 on the backplane and introduced new configuration parameters: `EMQX_RPC__TLS_VERSIONS` and `EMQX_RPC__CIPHERS`.
- [#11734](https://github.com/emqx/emqx/pull/11734) Fixed clustering in IPv6 network. Added new configurations `rpc.listen_address` and `rpc.ipv6_only` to allow EMQX cluster RPC server and client to use IPv6.
- [#11747](https://github.com/emqx/emqx/pull/11747) Updated QUIC stack to msquic 2.2.3.
- [#11796](https://github.com/emqx/emqx/pull/11796) Fixed rpc schema to ensure that client/server uses same transport driver.
- [#11798](https://github.com/emqx/emqx/pull/11798) Fixed the issue where the node could not start after executing `./bin/emqx data import [FILE]`.
The connection between `apikey_key` and `apikey_name` is also enhanced for better consistency and unique identification.
- `apikey_key`: When generating an API key via the dashboard, `apikey_key` will now create a unique value derived from the provided human-readable `apikey_name`.
- `apikey_name` Conversely, when using a bootstrap file to generate an API key, `apikey_name` will be generated as a unique value based on the associated `apikey_key`.
- [#11813](https://github.com/emqx/emqx/pull/11813) Fixed the schema to ensure that RPC client SSL port aligns with the configured server port. This fix also guarantees that the RPC ports are correctly opened in the Helm chart.
- [#11819](https://github.com/emqx/emqx/pull/11819) Upgraded opentelemetry library to v1.3.1-emqx. This opentelemetry release fixes invalid metrics timestamps in the exported metrics.
- [#11861](https://github.com/emqx/emqx/pull/11861) Fix excessive warning message print in remote console shell.
- [#11722](https://github.com/emqx/emqx/pull/11722) Fixed an issue where a Kafka Producer bridge with `sync` query mode would not buffer messages when in the `connecting` state.
- [#11724](https://github.com/emqx/emqx/pull/11724) Fixed a metrics-related issue where messages sent to Kafka would be counted as failed even when they were successfully transmitted afterward due to internal buffering.
- [#11728](https://github.com/emqx/emqx/pull/11728) Enhanced the LDAP filter string parser with the following improvements:
- Automatic escaping of special characters within filter strings.
- Fixed a bug that previously prevented the use of `dn` as a filter value.
- [#11733](https://github.com/emqx/emqx/pull/11733) Resolved an incompatibility issue that caused crashes during session takeover or channel eviction when the session was located on a remote node running EMQX v5.2.x or an earlier version.
- [#11750](https://github.com/emqx/emqx/pull/11750) Eliminated logging and tracing of HTTP request bodies in HTTP authentification and HTTP bridges.
- [#11760](https://github.com/emqx/emqx/pull/11760) Simplified the CQL query used for the Cassandra bridge health check, which was previously generating warnings in the Cassandra server logs.
- [#11886](https://github.com/emqx/emqx/pull/11886) Fixed backward plugin compatibility.
Currently, EMQX validates hookpoint names, so invalid hookspoints cannot be used for registering hooks. However, older versions of plugin templates used some misspelled hookpoints, and so could the real plugins. We allow the old hookpoints to be used for registering hooks, but issue a warning that they are deprecated. As before, these hooks are never called.
- [#11897](https://github.com/emqx/emqx/pull/11897) Fix config sync wait-loop race condition when cluster nodes boot around the same time.
## Breaking Changes

47
changes/v5.3.1.en.md Normal file
View File

@ -0,0 +1,47 @@
# v5.3.1
## Enhancements
- [#11637](https://github.com/emqx/emqx/pull/11637) Added extra diagnostic checks to help debug issues when mnesia is stuck waiting for tables. Library Updates: `ekka` has been upgraded to version 0.15.15, and `mria` to version 0.6.4.
## Bug Fixes
- [#11565](https://github.com/emqx/emqx/pull/11565) Upgraded jq library from v0.3.10 to v0.3.11. In this version, jq_port programs are initiated on-demand and will not appear in users' processes unless the jq function in EMQX is used. Additionally, idle jq_port programs will auto-terminate after a set period. Note: Most EMQX users are running jq in NIF mode and will not be affected by this update.
- [#11676](https://github.com/emqx/emqx/pull/11676) Hid a few pieces of sensitive information from debug-level logs.
- [#11697](https://github.com/emqx/emqx/pull/11697) Disabled outdated TLS versions and cipher suites in the EMQX backplane network (`gen_rpc`). Added support for tlsv1.3 on the backplane and introduced new configuration parameters: `EMQX_RPC__TLS_VERSIONS` and `EMQX_RPC__CIPHERS`.
The corresponding `gen_rpc` PR: https://github.com/emqx/gen_rpc/pull/36
- [#11734](https://github.com/emqx/emqx/pull/11734) Fixed clustering in IPv6 network. Added new configurations `rpc.listen_address` and `rpc.ipv6_only` to allow EMQX cluster RPC server and client to use IPv6.
- [#11747](https://github.com/emqx/emqx/pull/11747) Updated QUIC stack to msquic 2.2.3.
- [#11796](https://github.com/emqx/emqx/pull/11796) Fixed rpc schema to ensure that client/server uses same transport driver.
- [#11798](https://github.com/emqx/emqx/pull/11798) Fixed the issue where the node could not start after executing `./bin/emqx data import [FILE]`.
The connection between `apikey_key` and `apikey_name` is also enhanced for better consistency and unique identification.
- `apikey_key`: When generating an API key via the dashboard, `apikey_key` will now create a unique value derived from the provided human-readable `apikey_name`.
- `apikey_name` Conversely, when using a bootstrap file to generate an API key, `apikey_name` will be generated as a unique value based on the associated `apikey_key`.
- [#11813](https://github.com/emqx/emqx/pull/11813) Fixed the schema to ensure that RPC client SSL port aligns with the configured server port. This fix also guarantees that the RPC ports are correctly opened in the Helm chart.
- [#11819](https://github.com/emqx/emqx/pull/11819) Upgraded opentelemetry library to v1.3.1-emqx. This opentelemetry release fixes invalid metrics timestamps in the exported metrics.
- [#11861](https://github.com/emqx/emqx/pull/11861) Fixed excessive warning message printed in remote console shell.
- [#11733](https://github.com/emqx/emqx/pull/11733) Resolved an incompatibility issue that caused crashes during session takeover or channel eviction when the session was located on a remote node running EMQX v5.2.x or an earlier version.
- [#11750](https://github.com/emqx/emqx/pull/11750) Eliminated logging and tracing of HTTP request bodies in HTTP authentification and HTTP bridges.
- [#11886](https://github.com/emqx/emqx/pull/11886) Fixed backward plugin compatibility.
Currently, EMQX validates hookpoint names, so invalid hookspoints cannot be used for registering hooks. However, older versions of plugin templates used some misspelled hookpoints, and so could the real plugins. We allow the old hookpoints to be used for registering hooks, but issue a warning that they are deprecated. As before, these hooks are never called.
- [#11897](https://github.com/emqx/emqx/pull/11897) Fix config sync wait-loop race condition when cluster nodes boot around the same time.
## Breaking Changes

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 5.3.1-alpha.5
version: 5.3.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 5.3.1-alpha.5
appVersion: 5.3.1

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 5.3.1-alpha.1
version: 5.3.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 5.3.1-alpha.1
appVersion: 5.3.1

10
mix.exs
View File

@ -102,31 +102,31 @@ defmodule EMQXUmbrella.MixProject do
{:opentelemetry_api,
github: "emqx/opentelemetry-erlang",
sparse: "apps/opentelemetry_api",
tag: "v1.3.1-emqx",
tag: "v1.3.2-emqx",
override: true,
runtime: false},
{:opentelemetry,
github: "emqx/opentelemetry-erlang",
sparse: "apps/opentelemetry",
tag: "v1.3.1-emqx",
tag: "v1.3.2-emqx",
override: true,
runtime: false},
{:opentelemetry_api_experimental,
github: "emqx/opentelemetry-erlang",
sparse: "apps/opentelemetry_api_experimental",
tag: "v1.3.1-emqx",
tag: "v1.3.2-emqx",
override: true,
runtime: false},
{:opentelemetry_experimental,
github: "emqx/opentelemetry-erlang",
sparse: "apps/opentelemetry_experimental",
tag: "v1.3.1-emqx",
tag: "v1.3.2-emqx",
override: true,
runtime: false},
{:opentelemetry_exporter,
github: "emqx/opentelemetry-erlang",
sparse: "apps/opentelemetry_exporter",
tag: "v1.3.1-emqx",
tag: "v1.3.2-emqx",
override: true,
runtime: false}
] ++

View File

@ -85,13 +85,13 @@
, {jsone, {git, "https://github.com/emqx/jsone.git", {tag, "1.7.1"}}}
, {uuid, {git, "https://github.com/okeuday/uuid.git", {tag, "v2.0.6"}}}
%% trace
, {opentelemetry_api, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.1-emqx"}, "apps/opentelemetry_api"}}
, {opentelemetry, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.1-emqx"}, "apps/opentelemetry"}}
, {opentelemetry_api, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.2-emqx"}, "apps/opentelemetry_api"}}
, {opentelemetry, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.2-emqx"}, "apps/opentelemetry"}}
%% log metrics
, {opentelemetry_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.1-emqx"}, "apps/opentelemetry_experimental"}}
, {opentelemetry_api_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.1-emqx"}, "apps/opentelemetry_api_experimental"}}
, {opentelemetry_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.2-emqx"}, "apps/opentelemetry_experimental"}}
, {opentelemetry_api_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.2-emqx"}, "apps/opentelemetry_api_experimental"}}
%% export
, {opentelemetry_exporter, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.1-emqx"}, "apps/opentelemetry_exporter"}}
, {opentelemetry_exporter, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.2-emqx"}, "apps/opentelemetry_exporter"}}
]}.
{xref_ignores,