Merge pull request #10252 from emqx/release-50

0327 merge release-50 to master
This commit is contained in:
Zaiming (Stone) Shi 2023-03-29 12:33:17 +02:00 committed by GitHub
commit 80eb9d7542
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 240 additions and 31 deletions

View File

@ -35,7 +35,7 @@
-define(EMQX_RELEASE_CE, "5.0.21").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.0.2-rc.1").
-define(EMQX_RELEASE_EE, "5.0.2-rc.2").
%% the HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_BRIDGE_RESOURCE_HRL).
-define(EMQX_BRIDGE_RESOURCE_HRL, true).
-define(BRIDGE_HOOKPOINT(BridgeId), <<"$bridges/", BridgeId/binary>>).
-endif.

View File

@ -15,6 +15,7 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_resource).
-include("emqx_bridge_resource.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
@ -23,7 +24,9 @@
resource_id/1,
resource_id/2,
bridge_id/2,
parse_bridge_id/1
parse_bridge_id/1,
bridge_hookpoint/1,
bridge_hookpoint_to_bridge_id/1
]).
-export([
@ -89,6 +92,14 @@ parse_bridge_id(BridgeId) ->
)
end.
bridge_hookpoint(BridgeId) ->
<<"$bridges/", (bin(BridgeId))/binary>>.
bridge_hookpoint_to_bridge_id(?BRIDGE_HOOKPOINT(BridgeId)) ->
{ok, BridgeId};
bridge_hookpoint_to_bridge_id(_) ->
{error, bad_bridge_hookpoint}.
validate_name(Name0) ->
Name = unicode:characters_to_list(Name0, utf8),
case is_list(Name) andalso Name =/= [] of
@ -308,7 +319,8 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) ->
%% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it
%% receives a message from the external database.
BId = bridge_id(Type, Name),
Conf#{hookpoint => <<"$bridges/", BId/binary>>, bridge_name => Name};
BridgeHookpoint = bridge_hookpoint(BId),
Conf#{hookpoint => BridgeHookpoint, bridge_name => Name};
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
parse_confs(<<"kafka">> = _Type, Name, Conf) ->

View File

@ -0,0 +1,33 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_resource_tests).
-include_lib("eunit/include/eunit.hrl").
bridge_hookpoint_test_() ->
BridgeId = emqx_bridge_resource:bridge_id(type, name),
BridgeHookpoint = emqx_bridge_resource:bridge_hookpoint(BridgeId),
[
?_assertEqual(<<"$bridges/type:name">>, BridgeHookpoint),
?_assertEqual(
{ok, BridgeId},
emqx_bridge_resource:bridge_hookpoint_to_bridge_id(BridgeHookpoint)
),
?_assertEqual(
{error, bad_bridge_hookpoint},
emqx_bridge_resource:bridge_hookpoint_to_bridge_id(BridgeId)
)
].

View File

@ -213,11 +213,12 @@ get_rules_with_same_event(Topic) ->
].
-spec get_rule_ids_by_action(action_name()) -> [rule_id()].
get_rule_ids_by_action(ActionName) when is_binary(ActionName) ->
get_rule_ids_by_action(BridgeId) when is_binary(BridgeId) ->
[
Id
|| #{actions := Acts, id := Id} <- get_rules(),
lists:any(fun(A) -> A =:= ActionName end, Acts)
|| #{actions := Acts, id := Id, from := Froms} <- get_rules(),
forwards_to_bridge(Acts, BridgeId) orelse
references_ingress_bridge(Froms, BridgeId)
];
get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) ->
{Mod, Fun} =
@ -317,8 +318,14 @@ get_basic_usage_info() ->
NumRules = length(EnabledRules),
ReferencedBridges =
lists:foldl(
fun(#{actions := Actions, from := From}, Acc) ->
BridgeIDs0 = [BridgeID || <<"$bridges/", BridgeID/binary>> <- From],
fun(#{actions := Actions, from := Froms}, Acc) ->
BridgeIDs0 =
[
BridgeID
|| From <- Froms,
{ok, BridgeID} <-
[emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)]
],
BridgeIDs1 = lists:filter(fun is_binary/1, Actions),
tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc)
end,
@ -478,3 +485,19 @@ contains_actions(Actions, Mod0, Func0) ->
end,
Actions
).
forwards_to_bridge(Actions, BridgeId) ->
lists:any(fun(A) -> A =:= BridgeId end, Actions).
references_ingress_bridge(Froms, BridgeId) ->
lists:any(
fun(ReferenceBridgeId) ->
BridgeId =:= ReferenceBridgeId
end,
[
RefBridgeId
|| From <- Froms,
{ok, RefBridgeId} <-
[emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)]
]
).

View File

@ -20,6 +20,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("emqx_bridge/include/emqx_bridge_resource.hrl").
-export([
reload/0,
@ -1011,7 +1012,7 @@ hook_fun_name(HookPoint) ->
HookFunName.
%% return static function references to help static code checks
hook_fun(<<"$bridges/", _/binary>>) -> fun ?MODULE:on_bridge_message_received/2;
hook_fun(?BRIDGE_HOOKPOINT(_)) -> fun ?MODULE:on_bridge_message_received/2;
hook_fun('client.connected') -> fun ?MODULE:on_client_connected/3;
hook_fun('client.disconnected') -> fun ?MODULE:on_client_disconnected/4;
hook_fun('client.connack') -> fun ?MODULE:on_client_connack/4;
@ -1034,7 +1035,7 @@ ntoa(undefined) -> undefined;
ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)).
event_name(<<"$bridges/", _/binary>> = Bridge) -> Bridge;
event_name(?BRIDGE_HOOKPOINT(_) = Bridge) -> Bridge;
event_name(<<"$events/client_connected">>) -> 'client.connected';
event_name(<<"$events/client_disconnected">>) -> 'client.disconnected';
event_name(<<"$events/client_connack">>) -> 'client.connack';
@ -1047,7 +1048,7 @@ event_name(<<"$events/message_dropped">>) -> 'message.dropped';
event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped';
event_name(_) -> 'message.publish'.
event_topic(<<"$bridges/", _/binary>> = Bridge) -> Bridge;
event_topic(?BRIDGE_HOOKPOINT(_) = Bridge) -> Bridge;
event_topic('client.connected') -> <<"$events/client_connected">>;
event_topic('client.disconnected') -> <<"$events/client_disconnected">>;
event_topic('client.connack') -> <<"$events/client_connack">>;

View File

@ -25,6 +25,8 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
%%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
-define(TMP_RULEID, atom_to_binary(?FUNCTION_NAME)).
@ -198,8 +200,11 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(t_events, Config) ->
ets:delete(events_record_tab),
ok = delete_rule(?config(hook_points_rules, Config));
ok = delete_rule(?config(hook_points_rules, Config)),
emqx_common_test_helpers:call_janitor(),
ok;
end_per_testcase(_TestCase, _Config) ->
emqx_common_test_helpers:call_janitor(),
ok.
%%------------------------------------------------------------------------------
@ -2683,6 +2688,24 @@ t_get_basic_usage_info_1(_Config) ->
),
ok.
t_get_rule_ids_by_action_reference_ingress_bridge(_Config) ->
BridgeId = <<"mqtt:ingress">>,
RuleId = <<"rule:ingress_bridge_referenced">>,
{ok, _} =
emqx_rule_engine:create_rule(
#{
id => RuleId,
sql => <<"select 1 from \"$bridges/", BridgeId/binary, "\"">>,
actions => [#{function => console}]
}
),
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
?assertMatch(
[RuleId],
emqx_rule_engine:get_rule_ids_by_action(BridgeId)
),
ok.
%%------------------------------------------------------------------------------
%% Internal helpers
%%------------------------------------------------------------------------------

View File

@ -0,0 +1,3 @@
Consider bridges referenced in `FROM` rule clauses as dependencies.
Before this fix, when one tried to delete an ingress rule referenced in an action like `select * from "$bridges/mqtt:ingress"`, the UI would not trigger a warning about dependent rule actions.

View File

@ -53,7 +53,7 @@ VOLUME ["/opt/emqx/log", "/opt/emqx/data"]
# - 11883 port for internal MQTT/TCP
# - 18083 for dashboard and API
# - 4370 default Erlang distribution port
# - 5369 for backplain gen_rpc
# - 5369 for backplane gen_rpc
EXPOSE 1883 8083 8084 8883 11883 18083 4370 5369
ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]

View File

@ -599,10 +599,10 @@ emqx_ee_bridge_kafka {
}
consumer_offset_reset_policy {
desc {
en: "Defines how the consumers should reset the start offset when "
"a topic partition has and invalid or no initial offset."
zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时"
"消费者应如何重置开始偏移量。"
en: "Defines from which offset a consumer should start fetching when there"
" is no commit history or when the commit history becomes invalid."
zh: "当没有主题分区没有偏移量的历史记录,或则历史记录失效后"
"消费者应该使用哪个偏移量重新开始消费"
}
label {
en: "Offset Reset Policy"

View File

@ -105,7 +105,7 @@ values(consumer) ->
#{
kafka => #{
max_batch_bytes => <<"896KB">>,
offset_reset_policy => <<"reset_to_latest">>,
offset_reset_policy => <<"latest">>,
offset_commit_interval_seconds => 5
},
key_encoding_mode => <<"none">>,
@ -370,8 +370,8 @@ fields(consumer_kafka_opts) ->
})},
{offset_reset_policy,
mk(
enum([reset_to_latest, reset_to_earliest, reset_by_subscriber]),
#{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)}
enum([latest, earliest]),
#{default => latest, desc => ?DESC(consumer_offset_reset_policy)}
)},
{offset_commit_interval_seconds,
mk(

View File

@ -59,8 +59,7 @@
subscriber_id := subscriber_id(),
kafka_client_id := brod:client_id()
}.
-type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber.
%% -type mqtt_payload() :: full_message | message_value.
-type offset_reset_policy() :: latest | earliest.
-type encoding_mode() :: none | base64.
-type consumer_init_data() :: #{
hookpoint := binary(),
@ -271,7 +270,7 @@ start_consumer(Config, InstanceId, ClientID) ->
max_batch_bytes := MaxBatchBytes,
max_rejoin_attempts := MaxRejoinAttempts,
offset_commit_interval_seconds := OffsetCommitInterval,
offset_reset_policy := OffsetResetPolicy
offset_reset_policy := OffsetResetPolicy0
},
key_encoding_mode := KeyEncodingMode,
topic_mapping := TopicMapping0,
@ -290,7 +289,15 @@ start_consumer(Config, InstanceId, ClientID) ->
%% cluster, so that the load gets distributed between all
%% consumers and we don't repeat messages in the same cluster.
GroupID = consumer_group_id(BridgeName),
%% earliest or latest
BeginOffset = OffsetResetPolicy0,
OffsetResetPolicy =
case OffsetResetPolicy0 of
latest -> reset_to_latest;
earliest -> reset_to_earliest
end,
ConsumerConfig = [
{begin_offset, BeginOffset},
{max_bytes, MaxBatchBytes},
{offset_reset_policy, OffsetResetPolicy}
],

View File

@ -7,6 +7,7 @@
%% callbacks of behaviour emqx_resource
-export([
is_buffer_supported/0,
callback_mode/0,
on_start/2,
on_stop/2,
@ -26,6 +27,8 @@
%% to hocon; keeping this as just `kafka' for backwards compatibility.
-define(BRIDGE_TYPE, kafka).
is_buffer_supported() -> true.
callback_mode() -> async_if_possible.
%% @doc Config schema is defined in emqx_ee_bridge_kafka.

View File

@ -53,6 +53,7 @@ sasl_only_tests() ->
%% tests that do not need to be run on all groups
only_once_tests() ->
[
t_begin_offset_earliest,
t_bridge_rule_action_source,
t_cluster_group,
t_node_joins_existing_cluster,
@ -274,7 +275,18 @@ init_per_testcase(TestCase, Config) when
[{skip_does_not_apply, true}]
end;
init_per_testcase(TestCase, Config) when
TestCase =:= t_failed_creation_then_fixed;
TestCase =:= t_failed_creation_then_fixed
->
%% test with one partiton only for this case because
%% the wait probe may not be always sent to the same partition
HasProxy = proplists:get_value(has_proxy, Config, true),
case HasProxy of
false ->
[{skip_does_not_apply, true}];
true ->
common_init_per_testcase(TestCase, [{num_partitions, 1} | Config])
end;
init_per_testcase(TestCase, Config) when
TestCase =:= t_on_get_status;
TestCase =:= t_receive_after_recovery
->
@ -574,7 +586,7 @@ kafka_config(TestCase, _KafkaType, Config) ->
" max_rejoin_attempts = 5\n"
" offset_commit_interval_seconds = 3\n"
%% todo: matrix this
" offset_reset_policy = reset_to_latest\n"
" offset_reset_policy = latest\n"
" }\n"
"~s"
" key_encoding_mode = none\n"
@ -1736,7 +1748,18 @@ t_node_joins_existing_cluster(Config) ->
setup_group_subscriber_spy(N1),
{{ok, _}, {ok, _}} =
?wait_async_action(
erpc:call(N1, fun() -> {ok, _} = create_bridge(Config) end),
erpc:call(N1, fun() ->
{ok, _} = create_bridge(
Config,
#{
<<"kafka">> =>
#{
<<"offset_reset_policy">> =>
<<"earliest">>
}
}
)
end),
#{?snk_kind := kafka_consumer_subscriber_started},
15_000
),
@ -1767,14 +1790,19 @@ t_node_joins_existing_cluster(Config) ->
wait_for_cluster_rpc(N2),
{ok, _} = snabbkaffe:receive_events(SRef0),
?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId])),
?retry(
_Sleep1 = 100,
_Attempts1 = 50,
?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId]))
),
%% Give some time for the consumers in both nodes to
%% rebalance.
{ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000),
%% Publish some messages so we can check they came from each node.
?retry(
_Sleep1 = 100,
_Attempts1 = 50,
_Sleep2 = 100,
_Attempts2 = 50,
true = erpc:call(N2, emqx_router, has_routes, [MQTTTopic])
),
{ok, SRef1} =
@ -1784,7 +1812,7 @@ t_node_joins_existing_cluster(Config) ->
?snk_span := {complete, _}
}),
NPartitions,
10_000
20_000
),
lists:foreach(
fun(N) ->
@ -1919,3 +1947,57 @@ t_cluster_node_down(Config) ->
end
),
ok.
t_begin_offset_earliest(Config) ->
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
{ok, C} = emqtt:start_link([{proto_ver, v5}]),
on_exit(fun() -> emqtt:stop(C) end),
{ok, _} = emqtt:connect(C),
{ok, _, [2]} = emqtt:subscribe(C, MQTTTopic, 2),
?check_trace(
begin
%% publish a message before the bridge is started.
NumMessages = 5,
lists:foreach(
fun(N) ->
publish(Config, [
#{
key => <<"mykey", (integer_to_binary(N))/binary>>,
value => Payload,
headers => [{<<"hkey">>, <<"hvalue">>}]
}
])
end,
lists:seq(1, NumMessages)
),
{ok, _} = create_bridge(Config, #{
<<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>}
}),
#{num_published => NumMessages}
end,
fun(Res, _Trace) ->
#{num_published := NumMessages} = Res,
%% we should receive messages published before starting
%% the consumers
Published = receive_published(#{n => NumMessages}),
Payloads = lists:map(
fun(#{payload := P}) -> emqx_json:decode(P, [return_maps]) end,
Published
),
?assert(
lists:all(
fun(#{<<"value">> := V}) -> V =:= Payload end,
Payloads
),
#{payloads => Payloads}
),
?assertEqual(NumMessages, emqx_resource_metrics:received_get(ResourceId)),
ok
end
),
ok.

View File

@ -260,7 +260,7 @@ bridges.kafka_consumer.my_consumer {
max_batch_bytes = 896KB
max_rejoin_attempts = 5
offset_commit_interval_seconds = 3
offset_reset_policy = reset_to_latest
offset_reset_policy = latest
}
topic_mapping = [
{