Merge remote-tracking branch 'origin/release-57'

This commit is contained in:
ieQu1 2024-05-23 21:04:03 +02:00
commit c952e46f08
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
58 changed files with 1624 additions and 574 deletions

View File

@ -21,7 +21,7 @@ endif
# Dashboard version
# from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.9.0-beta.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.7.0-beta.4
export EMQX_EE_DASHBOARD_VERSION ?= e1.7.0-beta.9
PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise

View File

@ -64,6 +64,7 @@
{emqx_node_rebalance_status,2}.
{emqx_persistent_session_ds,1}.
{emqx_plugins,1}.
{emqx_plugins,2}.
{emqx_prometheus,1}.
{emqx_prometheus,2}.
{emqx_resource,1}.

View File

@ -246,7 +246,8 @@ fields(layout_builtin_reference) ->
reference,
#{
'readOnly' => true,
importance => ?IMPORTANCE_HIDDEN
importance => ?IMPORTANCE_LOW,
desc => ?DESC(layout_builtin_reference_type)
}
)}
].
@ -257,6 +258,8 @@ desc(builtin_local_write_buffer) ->
?DESC(builtin_local_write_buffer);
desc(layout_builtin_wildcard_optimized) ->
?DESC(layout_builtin_wildcard_optimized);
desc(layout_builtin_reference) ->
?DESC(layout_builtin_reference);
desc(_) ->
undefined.
@ -273,17 +276,12 @@ ds_schema(Options) ->
Options
).
-ifndef(TEST).
builtin_layouts() ->
[ref(layout_builtin_wildcard_optimized)].
-else.
builtin_layouts() ->
%% Reference layout stores everything in one stream, so it's not
%% suitable for production use. However, it's very simple and
%% produces a very predictabale replay order, which can be useful
%% for testing and debugging:
[ref(layout_builtin_wildcard_optimized), ref(layout_builtin_reference)].
-endif.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).

View File

@ -54,7 +54,7 @@ init() ->
-spec is_persistence_enabled() -> boolean().
is_persistence_enabled() ->
persistent_term:get(?PERSISTENCE_ENABLED).
persistent_term:get(?PERSISTENCE_ENABLED, false).
-spec is_persistence_enabled(emqx_types:zone()) -> boolean().
is_persistence_enabled(Zone) ->

View File

@ -629,8 +629,10 @@ replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) -
class => recoverable,
retry_in_ms => RetryTimeout
}),
emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0)
%% TODO: Handle unrecoverable errors.
emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0);
{error, unrecoverable, Reason} ->
Session1 = skip_batch(StreamKey, Srs0, Session0, ClientInfo, Reason),
replay_streams(Session1#{replay := Rest}, ClientInfo)
end;
replay_streams(Session0 = #{replay := []}, _ClientInfo) ->
Session = maps:remove(replay, Session0),
@ -655,6 +657,39 @@ replay_batch(Srs0, Session0, ClientInfo) ->
Error
end.
%% Handle `{error, unrecoverable, _}' returned by `enqueue_batch'.
%% Most likely they mean that the generation containing the messages
%% has been removed.
-spec skip_batch(_StreamKey, stream_state(), session(), clientinfo(), _Reason) -> session().
skip_batch(StreamKey, SRS0, Session = #{s := S0}, ClientInfo, Reason) ->
?SLOG(info, #{
msg => "session_ds_replay_unrecoverable_error",
reason => Reason,
srs => SRS0
}),
GenEvents = fun
F(QoS, SeqNo, LastSeqNo) when SeqNo < LastSeqNo ->
FakeMsg = #message{
id = <<>>,
qos = QoS,
payload = <<>>,
topic = <<>>,
timestamp = 0
},
_ = emqx_session_events:handle_event(ClientInfo, {expired, FakeMsg}),
F(QoS, inc_seqno(QoS, SeqNo), LastSeqNo);
F(_, _, _) ->
ok
end,
%% Treat messages as expired:
GenEvents(?QOS_1, SRS0#srs.first_seqno_qos1, SRS0#srs.last_seqno_qos1),
GenEvents(?QOS_2, SRS0#srs.first_seqno_qos2, SRS0#srs.last_seqno_qos2),
SRS = SRS0#srs{it_end = end_of_stream, batch_size = 0},
%% That's it for the iterator. Mark SRS as reached the
%% `end_of_stream', and let stream scheduler do the rest:
S = emqx_persistent_session_ds_state:put_stream(StreamKey, SRS, S0),
Session#{s := S}.
%%--------------------------------------------------------------------
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
@ -923,15 +958,16 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
),
S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2),
Session#{s => S};
{error, Class, Reason} ->
%% TODO: Handle unrecoverable error.
{error, recoverable, Reason} ->
?SLOG(debug, #{
msg => "failed_to_fetch_batch",
stream => StreamKey,
reason => Reason,
class => Class
class => recoverable
}),
Session0
Session0;
{error, unrecoverable, Reason} ->
skip_batch(StreamKey, Srs1, Session0, ClientInfo, Reason)
end.
enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0, s := S}, ClientInfo) ->

View File

@ -1690,7 +1690,7 @@ fields("durable_sessions") ->
sc(
timeout_duration(),
#{
default => <<"5000ms">>,
default => <<"1s">>,
importance => ?IMPORTANCE_HIDDEN
}
)},

View File

@ -48,7 +48,11 @@ format_meta_map(Meta) ->
format_meta_map(Meta, Encode).
format_meta_map(Meta, Encode) ->
format_meta_map(Meta, Encode, [{packet, fun format_packet/2}, {payload, fun format_payload/2}]).
format_meta_map(Meta, Encode, [
{packet, fun format_packet/2},
{payload, fun format_payload/2},
{<<"payload">>, fun format_payload/2}
]).
format_meta_map(Meta, _Encode, []) ->
Meta;
@ -61,9 +65,21 @@ format_meta_map(Meta, Encode, [{Name, FormatFun} | Rest]) ->
format_meta_map(Meta, Encode, Rest)
end.
format_meta_data(Meta0, Encode) when is_map(Meta0) ->
Meta1 = format_meta_map(Meta0, Encode),
maps:map(fun(_K, V) -> format_meta_data(V, Encode) end, Meta1);
format_meta_data(Meta, Encode) when is_list(Meta) ->
[format_meta_data(Item, Encode) || Item <- Meta];
format_meta_data(Meta, Encode) when is_tuple(Meta) ->
List = erlang:tuple_to_list(Meta),
FormattedList = [format_meta_data(Item, Encode) || Item <- List],
erlang:list_to_tuple(FormattedList);
format_meta_data(Meta, _Encode) ->
Meta.
format_meta(Meta0, Encode) ->
Meta1 = maps:without([msg, clientid, peername, trace_tag], Meta0),
Meta2 = format_meta_map(Meta1, Encode),
Meta2 = format_meta_data(Meta1, Encode),
kvs_to_iolist(lists:sort(fun compare_meta_kvs/2, maps:to_list(Meta2))).
%% packet always goes first; payload always goes last

View File

@ -42,7 +42,7 @@ format(
%% an external call to create the JSON text
Time = emqx_utils_calendar:now_to_rfc3339(microsecond),
LogMap2 = LogMap1#{time => Time},
LogMap3 = prepare_log_map(LogMap2, PEncode),
LogMap3 = prepare_log_data(LogMap2, PEncode),
[emqx_logger_jsonfmt:best_effort_json(LogMap3, [force_utf8]), "\n"].
%%%-----------------------------------------------------------------
@ -85,9 +85,17 @@ do_maybe_format_msg({report, Report} = Msg, #{report_cb := Cb} = Meta, Config) -
do_maybe_format_msg(Msg, Meta, Config) ->
emqx_logger_jsonfmt:format_msg(Msg, Meta, Config).
prepare_log_map(LogMap, PEncode) ->
prepare_log_data(LogMap, PEncode) when is_map(LogMap) ->
NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)],
maps:from_list(NewKeyValuePairs).
maps:from_list(NewKeyValuePairs);
prepare_log_data(V, PEncode) when is_list(V) ->
[prepare_log_data(Item, PEncode) || Item <- V];
prepare_log_data(V, PEncode) when is_tuple(V) ->
List = erlang:tuple_to_list(V),
PreparedList = [prepare_log_data(Item, PEncode) || Item <- List],
erlang:list_to_tuple(PreparedList);
prepare_log_data(V, _PEncode) ->
V.
prepare_key_value(host, {I1, I2, I3, I4} = IP, _PEncode) when
is_integer(I1),
@ -118,6 +126,8 @@ prepare_key_value(payload = K, V, PEncode) ->
V
end,
{K, NewV};
prepare_key_value(<<"payload">>, V, PEncode) ->
prepare_key_value(payload, V, PEncode);
prepare_key_value(packet = K, V, PEncode) ->
NewV =
try
@ -167,10 +177,8 @@ prepare_key_value(action_id = K, V, _PEncode) ->
_:_ ->
{K, V}
end;
prepare_key_value(K, V, PEncode) when is_map(V) ->
{K, prepare_log_map(V, PEncode)};
prepare_key_value(K, V, _PEncode) ->
{K, V}.
prepare_key_value(K, V, PEncode) ->
{K, prepare_log_data(V, PEncode)}.
format_packet(undefined, _) -> "";
format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode).

View File

@ -473,7 +473,7 @@ zone_global_defaults() ->
idle_poll_interval => 100,
heartbeat_interval => 5000,
message_retention_period => 86400000,
renew_streams_interval => 5000,
renew_streams_interval => 1000,
session_gc_batch_size => 100,
session_gc_interval => 600000,
subscription_count_refresh_interval => 5000,

View File

@ -109,6 +109,7 @@ start(Nodes, ClusterOpts) ->
start(NodeSpecs).
start(NodeSpecs) ->
emqx_common_test_helpers:clear_screen(),
ct:pal("(Re)starting nodes:\n ~p", [NodeSpecs]),
% 1. Start bare nodes with only basic applications running
ok = start_nodes_init(NodeSpecs, ?TIMEOUT_NODE_START_MS),

View File

@ -987,14 +987,45 @@ call_operation_if_enabled(NodeOrAll, OperFunc, [Nodes, ConfRootKey, BridgeType,
?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
end.
is_enabled_bridge(ConfRootKey, BridgeType, BridgeName) ->
try emqx_bridge_v2:lookup(ConfRootKey, BridgeType, binary_to_existing_atom(BridgeName)) of
is_enabled_bridge(ConfRootKey, ActionOrSourceType, BridgeName) ->
try
emqx_bridge_v2:lookup(ConfRootKey, ActionOrSourceType, binary_to_existing_atom(BridgeName))
of
{ok, #{raw_config := ConfMap}} ->
maps:get(<<"enable">>, ConfMap, true);
maps:get(<<"enable">>, ConfMap, true) andalso
is_connector_enabled(
ActionOrSourceType,
maps:get(<<"connector">>, ConfMap)
);
{error, not_found} ->
throw(not_found)
catch
error:badarg ->
%% catch non-existing atom,
%% none-existing atom means it is not available in config PT storage.
throw(not_found);
error:{badkey, _} ->
%% `connector' field not present. Should never happen if action/source schema
%% is properly defined.
throw(not_found)
end.
is_connector_enabled(ActionOrSourceType, ConnectorName0) ->
try
ConnectorType = emqx_bridge_v2:connector_type(ActionOrSourceType),
ConnectorName = to_existing_atom(ConnectorName0),
case emqx_config:get([connectors, ConnectorType, ConnectorName], undefined) of
undefined ->
throw(not_found);
Config = #{} ->
maps:get(enable, Config, true)
end
catch
throw:badarg ->
%% catch non-existing atom,
%% none-existing atom means it is not available in config PT storage.
throw(not_found);
throw:bad_atom ->
%% catch non-existing atom,
%% none-existing atom means it is not available in config PT storage.
throw(not_found)
@ -1407,3 +1438,9 @@ map_to_json(M0) ->
M2 = maps:without([value, <<"value">>], M1),
emqx_utils_json:encode(M2)
end.
to_existing_atom(X) ->
case emqx_utils:safe_to_existing_atom(X, utf8) of
{ok, A} -> A;
{error, _} -> throw(bad_atom)
end.

View File

@ -788,6 +788,60 @@ t_update_connector_not_found(_Config) ->
),
ok.
%% Check that https://emqx.atlassian.net/browse/EMQX-12376 is fixed
t_update_concurrent_health_check(_Config) ->
Msg = <<"Channel status check failed">>,
ok = meck:expect(
emqx_bridge_v2_test_connector,
on_get_channel_status,
fun(
_ResId,
ChannelId,
#{channels := Channels}
) ->
#{
is_conf_for_connected := Connected
} = maps:get(ChannelId, Channels),
case Connected of
true ->
connected;
false ->
{error, Msg}
end
end
),
BaseConf = (bridge_config())#{
is_conf_for_connected => false
},
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, BaseConf)),
SetStatusConnected =
fun
(true) ->
Conf = BaseConf#{is_conf_for_connected => true},
%% Update the config
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)),
?assertMatch(
#{status := connected},
emqx_bridge_v2:health_check(bridge_type(), my_test_bridge)
);
(false) ->
Conf = BaseConf#{is_conf_for_connected => false},
%% Update the config
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)),
?assertMatch(
#{status := disconnected},
emqx_bridge_v2:health_check(bridge_type(), my_test_bridge)
)
end,
[
begin
Connected = (N rem 2) =:= 0,
SetStatusConnected(Connected)
end
|| N <- lists:seq(0, 20)
],
ok.
t_remove_single_connector_being_referenced_with_active_channels(_Config) ->
%% we test the connector post config update here because we also need bridges.
Conf = bridge_config(),

View File

@ -109,6 +109,7 @@
-define(SOURCE_TYPE_STR, "mqtt").
-define(SOURCE_TYPE, <<?SOURCE_TYPE_STR>>).
-define(SOURCE_CONNECTOR_TYPE, ?SOURCE_TYPE).
-define(APPSPECS, [
emqx_conf,
@ -166,9 +167,19 @@ init_per_group(single = Group, Config) ->
Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}),
init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]);
init_per_group(actions, Config) ->
[{bridge_kind, action} | Config];
[
{bridge_kind, action},
{connector_type, ?ACTION_CONNECTOR_TYPE},
{connector_name, ?ACTION_CONNECTOR_NAME}
| Config
];
init_per_group(sources, Config) ->
[{bridge_kind, source} | Config];
[
{bridge_kind, source},
{connector_type, ?SOURCE_CONNECTOR_TYPE},
{connector_name, ?SOURCE_CONNECTOR_NAME}
| Config
];
init_per_group(_Group, Config) ->
Config.
@ -202,14 +213,45 @@ end_per_group(single, Config) ->
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(t_action_types, Config) ->
init_per_testcase(TestCase, Config) when
TestCase =:= t_start_action_or_source_with_disabled_connector;
TestCase =:= t_action_types
->
case ?config(cluster_nodes, Config) of
undefined ->
init_mocks();
Nodes ->
[erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
end,
Config;
#{
connector_config := ConnectorConfig,
bridge_type := BridgeType,
bridge_name := BridgeName,
bridge_config := BridgeConfig
} =
case ?config(bridge_kind, Config) of
action ->
#{
connector_config => ?ACTIONS_CONNECTOR,
bridge_type => {action_type, ?ACTION_TYPE},
bridge_name => {action_name, ?ACTION_CONNECTOR_NAME},
bridge_config => {action_config, ?KAFKA_BRIDGE(?ACTION_CONNECTOR_NAME)}
};
source ->
#{
connector_config => source_connector_create_config(#{}),
bridge_type => {source_type, ?SOURCE_TYPE},
bridge_name => {source_name, ?SOURCE_CONNECTOR_NAME},
bridge_config => {source_config, source_config_base()}
}
end,
[
{connector_config, ConnectorConfig},
BridgeType,
BridgeName,
BridgeConfig
| Config
];
init_per_testcase(_TestCase, Config) ->
case ?config(cluster_nodes, Config) of
undefined ->
@ -434,7 +476,7 @@ source_connector_create_config(Overrides0) ->
source_connector_config_base(),
#{
<<"enable">> => true,
<<"type">> => ?SOURCE_TYPE,
<<"type">> => ?SOURCE_CONNECTOR_TYPE,
<<"name">> => ?SOURCE_CONNECTOR_NAME
}
),
@ -1547,3 +1589,12 @@ t_older_version_nodes_in_cluster(Config) ->
),
ok.
t_start_action_or_source_with_disabled_connector(matrix) ->
[
[single, actions],
[single, sources]
];
t_start_action_or_source_with_disabled_connector(Config) ->
ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
ok.

View File

@ -383,6 +383,25 @@ start_connector_api(ConnectorName, ConnectorType) ->
ct:pal("connector update (http) result:\n ~p", [Res]),
Res.
enable_connector_api(ConnectorType, ConnectorName) ->
do_enable_disable_connector_api(ConnectorType, ConnectorName, enable).
disable_connector_api(ConnectorType, ConnectorName) ->
do_enable_disable_connector_api(ConnectorType, ConnectorName, disable).
do_enable_disable_connector_api(ConnectorType, ConnectorName, Op) ->
ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
{OpPath, OpStr} =
case Op of
enable -> {"true", "enable"};
disable -> {"false", "disable"}
end,
Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId, "enable", OpPath]),
ct:pal(OpStr ++ " connector ~s (http)", [ConnectorId]),
Res = request(put, Path, []),
ct:pal(OpStr ++ " connector ~s (http) result:\n ~p", [ConnectorId, Res]),
Res.
get_connector_api(ConnectorType, ConnectorName) ->
ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName),
Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]),
@ -956,3 +975,27 @@ t_on_get_status(Config, Opts) ->
)
end,
ok.
%% Verifies that attempting to start an action while its connnector is disabled does not
%% start the connector.
t_start_action_or_source_with_disabled_connector(Config) ->
#{
kind := Kind,
type := Type,
name := Name,
connector_type := ConnectorType,
connector_name := ConnectorName
} = get_common_values(Config),
?check_trace(
begin
{ok, _} = create_bridge_api(Config),
{ok, {{_, 204, _}, _, _}} = disable_connector_api(ConnectorType, ConnectorName),
?assertMatch(
{error, {{_, 400, _}, _, _}},
op_bridge_api(Kind, "start", Type, Name)
),
ok
end,
[]
),
ok.

View File

@ -228,3 +228,7 @@ t_sync_query(Config) ->
postgres_bridge_connector_on_query_return
),
ok.
t_start_action_or_source_with_disabled_connector(Config) ->
ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
ok.

View File

@ -112,11 +112,13 @@ on_start(
),
ClientId = client_id(InstanceId),
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
ClientCfg = namespace(#{acl_info => ACLInfo}, Config),
Namespace = maps:get(namespace, Config, <<>>),
ClientCfg = #{acl_info => ACLInfo, namespace => Namespace},
State = #{
client_id => ClientId,
acl_info => ACLInfo,
namespace => Namespace,
installed_channels => #{}
},
@ -139,12 +141,13 @@ on_add_channel(
_InstId,
#{
installed_channels := InstalledChannels,
namespace := Namespace,
acl_info := ACLInfo
} = OldState,
ChannelId,
ChannelConfig
) ->
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo),
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo, Namespace),
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
@ -152,16 +155,18 @@ on_add_channel(
create_channel_state(
#{parameters := Conf} = _ChannelConfig,
ACLInfo
ACLInfo,
Namespace
) ->
#{
topic := Topic,
sync_timeout := SyncTimeout
sync_timeout := SyncTimeout,
strategy := Strategy
} = Conf,
TopicTks = emqx_placeholder:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Conf, ACLInfo),
ProducerOpts = make_producer_opts(Conf, ACLInfo, Namespace, Strategy),
Templates = parse_template(Conf),
DispatchStrategy = parse_dispatch_strategy(Conf),
DispatchStrategy = parse_dispatch_strategy(Strategy),
State = #{
topic => Topic,
topic_tokens => TopicTks,
@ -330,11 +335,11 @@ parse_template([], Templates) ->
Templates.
%% returns a procedure to generate the produce context
parse_dispatch_strategy(#{strategy := roundrobin}) ->
parse_dispatch_strategy(roundrobin) ->
fun(_) ->
#{}
end;
parse_dispatch_strategy(#{strategy := Template}) ->
parse_dispatch_strategy(Template) ->
Tokens = emqx_placeholder:preproc_tmpl(Template),
fun(Msg) ->
#{
@ -400,12 +405,20 @@ make_producer_opts(
send_buffer := SendBuff,
refresh_interval := RefreshInterval
},
ACLInfo
ACLInfo,
Namespace,
Strategy
) ->
#{
tcp_opts => [{sndbuf, SendBuff}],
ref_topic_route_interval => RefreshInterval,
acl_info => emqx_secret:wrap(ACLInfo)
acl_info => emqx_secret:wrap(ACLInfo),
namespace => Namespace,
partitioner =>
case Strategy of
roundrobin -> roundrobin;
_ -> key_dispatch
end
}.
acl_info(<<>>, _, _) ->
@ -424,10 +437,6 @@ acl_info(AccessKey, SecretKey, SecurityToken) when is_binary(AccessKey) ->
acl_info(_, _, _) ->
#{}.
namespace(ClientCfg, Config) ->
Namespace = maps:get(namespace, Config, <<>>),
ClientCfg#{namespace => Namespace}.
create_producers_map(ClientId) ->
_ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]),
ok.

View File

@ -310,7 +310,7 @@ on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels})
ChannelState = #{mode := direct} ->
run_simple_upload(InstId, Tag, Data, ChannelState, Config);
ChannelState = #{mode := aggregated} ->
run_aggregated_upload(InstId, [Data], ChannelState);
run_aggregated_upload(InstId, Tag, [Data], ChannelState);
undefined ->
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
end.
@ -321,7 +321,7 @@ on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) ->
case maps:get(Tag, Channels, undefined) of
ChannelState = #{mode := aggregated} ->
Records = [Data0 | [Data || {_, Data} <- Rest]],
run_aggregated_upload(InstId, Records, ChannelState);
run_aggregated_upload(InstId, Tag, Records, ChannelState);
undefined ->
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
end.
@ -362,8 +362,12 @@ run_simple_upload(
{error, map_error(Reason)}
end.
run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) ->
run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) ->
Timestamp = erlang:system_time(second),
emqx_trace:rendered_action_template(ChannelID, #{
mode => aggregated,
records => Records
}),
case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of
ok ->
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}),

View File

@ -63,10 +63,14 @@ fields(action) ->
fields(?ACTION) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
mkunion(mode, #{
<<"direct">> => ?R_REF(s3_direct_upload_parameters),
<<"aggregated">> => ?R_REF(s3_aggregated_upload_parameters)
}),
mkunion(
mode,
#{
<<"direct">> => ?R_REF(s3_direct_upload_parameters),
<<"aggregated">> => ?R_REF(s3_aggregated_upload_parameters)
},
<<"direct">>
),
#{
required => true,
desc => ?DESC(s3_upload),
@ -87,7 +91,7 @@ fields(s3_direct_upload_parameters) ->
hoconsc:mk(
direct,
#{
required => true,
default => <<"direct">>,
desc => ?DESC(s3_direct_upload_mode)
}
)},
@ -187,13 +191,22 @@ fields(s3_upload_resource_opts) ->
]).
mkunion(Field, Schemas) ->
hoconsc:union(fun(Arg) -> scunion(Field, Schemas, Arg) end).
mkunion(Field, Schemas, none).
scunion(_Field, Schemas, all_union_members) ->
mkunion(Field, Schemas, Default) ->
hoconsc:union(fun(Arg) -> scunion(Field, Schemas, Default, Arg) end).
scunion(_Field, Schemas, _Default, all_union_members) ->
maps:values(Schemas);
scunion(Field, Schemas, {value, Value}) ->
Selector = maps:get(emqx_utils_conv:bin(Field), Value, undefined),
case Selector == undefined orelse maps:find(emqx_utils_conv:bin(Selector), Schemas) of
scunion(Field, Schemas, Default, {value, Value}) ->
Selector =
case maps:get(emqx_utils_conv:bin(Field), Value, undefined) of
undefined ->
Default;
X ->
emqx_utils_conv:bin(X)
end,
case maps:find(Selector, Schemas) of
{ok, Schema} ->
[Schema];
_Error ->

View File

@ -119,7 +119,7 @@ current_rate(all) ->
current_rate(Node) when Node == node() ->
try
{ok, Rate} = do_call(current_rate),
{ok, Rate}
{ok, adjust_individual_node_metrics(Rate)}
catch
_E:R ->
?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}),
@ -156,8 +156,8 @@ current_rate_cluster() ->
case lists:foldl(Fun, #{}, mria:cluster_nodes(running)) of
{badrpc, Reason} ->
{badrpc, Reason};
Rate ->
{ok, Rate}
Metrics ->
{ok, adjust_synthetic_cluster_metrics(Metrics)}
end.
%% -------------------------------------------------------------------------------------------------
@ -264,8 +264,8 @@ merge_cluster_rate(Node, Cluster) ->
%% cluster-synced values
(disconnected_durable_sessions, V, NCluster) ->
NCluster#{disconnected_durable_sessions => V};
(durable_subscriptions, V, NCluster) ->
NCluster#{durable_subscriptions => V};
(subscriptions_durable, V, NCluster) ->
NCluster#{subscriptions_durable => V};
(topics, V, NCluster) ->
NCluster#{topics => V};
(retained_msg_count, V, NCluster) ->
@ -283,6 +283,28 @@ merge_cluster_rate(Node, Cluster) ->
end,
maps:fold(Fun, Cluster, Node).
adjust_individual_node_metrics(Metrics0) ->
%% ensure renamed
emqx_utils_maps:rename(durable_subscriptions, subscriptions_durable, Metrics0).
adjust_synthetic_cluster_metrics(Metrics0) ->
DSSubs = maps:get(subscriptions_durable, Metrics0, 0),
RamSubs = maps:get(subscriptions, Metrics0, 0),
DisconnectedDSs = maps:get(disconnected_durable_sessions, Metrics0, 0),
Metrics1 = maps:update_with(
subscriptions,
fun(Subs) -> Subs + DSSubs end,
0,
Metrics0
),
Metrics = maps:put(subscriptions_ram, RamSubs, Metrics1),
maps:update_with(
connections,
fun(RamConns) -> RamConns + DisconnectedDSs end,
DisconnectedDSs,
Metrics
).
format({badrpc, Reason}) ->
{badrpc, Reason};
format(Data) ->

View File

@ -19,6 +19,7 @@
-include("emqx_dashboard.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hocon_types.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
-behaviour(minirest_api).
@ -159,7 +160,12 @@ dashboard_samplers_fun(Latest) ->
monitor_current(get, #{bindings := Bindings}) ->
RawNode = maps:get(node, Bindings, <<"all">>),
emqx_utils_api:with_node_or_cluster(RawNode, fun current_rate/1).
case emqx_utils_api:with_node_or_cluster(RawNode, fun current_rate/1) of
?OK(Rates) ->
?OK(maybe_reject_cluster_only_metrics(RawNode, Rates));
Error ->
Error
end.
-spec current_rate(atom()) ->
{error, term()}
@ -242,3 +248,12 @@ swagger_desc_format(Format) ->
swagger_desc_format(Format, Type) ->
Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL),
list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])).
maybe_reject_cluster_only_metrics(<<"all">>, Rates) ->
Rates;
maybe_reject_cluster_only_metrics(_Node, Rates) ->
ClusterOnlyMetrics = [
subscriptions_durable,
disconnected_durable_sessions
],
maps:without(ClusterOnlyMetrics, Rates).

View File

@ -49,6 +49,8 @@
"}"
>>).
-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
%%--------------------------------------------------------------------
%% CT boilerplate
%%--------------------------------------------------------------------
@ -79,21 +81,37 @@ end_per_suite(_Config) ->
ok.
init_per_group(persistent_sessions = Group, Config) ->
Apps = emqx_cth_suite:start(
AppSpecsFn = fun(Enable) ->
Port =
case Enable of
true -> "18083";
false -> "0"
end,
[
emqx_conf,
{emqx, "durable_sessions {enable = true}"},
{emqx_retainer, ?BASE_RETAINER_CONF},
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard(
"dashboard.listeners.http { enable = true, bind = 18083 }\n"
"dashboard.sample_interval = 1s"
lists:concat([
"dashboard.listeners.http { bind = " ++ Port ++ " }\n",
"dashboard.sample_interval = 1s\n",
"dashboard.listeners.http.enable = " ++ atom_to_list(Enable)
])
)
],
#{work_dir => emqx_cth_suite:work_dir(Group, Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
[{apps, Apps} | Config];
]
end,
NodeSpecs = [
{dashboard_monitor1, #{apps => AppSpecsFn(true)}},
{dashboard_monitor2, #{apps => AppSpecsFn(false)}}
],
Nodes =
[N1 | _] = emqx_cth_cluster:start(
NodeSpecs,
#{work_dir => emqx_cth_suite:work_dir(Group, Config)}
),
?ON(N1, {ok, _} = emqx_common_test_http:create_default_app()),
[{cluster, Nodes} | Config];
init_per_group(common = Group, Config) ->
Apps = emqx_cth_suite:start(
[
@ -111,7 +129,11 @@ init_per_group(common = Group, Config) ->
{ok, _} = emqx_common_test_http:create_default_app(),
[{apps, Apps} | Config].
end_per_group(_Group, Config) ->
end_per_group(persistent_sessions, Config) ->
Cluster = ?config(cluster, Config),
emqx_cth_cluster:stop(Cluster),
ok;
end_per_group(common, Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
@ -196,13 +218,22 @@ t_monitor_current_api(_) ->
{ok, Rate} = request(["monitor_current"]),
[
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST,
%% We rename `durable_subscriptions' key.
Key =/= durable_subscriptions
],
?assert(maps:is_key(<<"subscriptions_durable">>, Rate)),
?assert(maps:is_key(<<"disconnected_durable_sessions">>, Rate)),
ClusterOnlyMetrics = [durable_subscriptions, disconnected_durable_sessions],
{ok, NodeRate} = request(["monitor_current", "nodes", node()]),
[
?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate))
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate), #{key => Key, rates => NodeRate})
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST,
not lists:member(Key, ClusterOnlyMetrics)
],
?assertNot(maps:is_key(<<"subscriptions_durable">>, NodeRate)),
?assertNot(maps:is_key(<<"subscriptions_ram">>, NodeRate)),
?assertNot(maps:is_key(<<"disconnected_durable_sessions">>, NodeRate)),
ok.
t_monitor_current_api_live_connections(_) ->
@ -290,8 +321,11 @@ t_monitor_reset(_) ->
{ok, Rate} = request(["monitor_current"]),
[
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST,
%% We rename `durable_subscriptions' key.
Key =/= durable_subscriptions
],
?assert(maps:is_key(<<"subscriptions_durable">>, Rate)),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(1, #{?snk_kind := dashboard_monitor_flushed}),
@ -313,26 +347,36 @@ t_monitor_api_error(_) ->
ok.
%% Verifies that subscriptions from persistent sessions are correctly accounted for.
t_persistent_session_stats(_Config) ->
t_persistent_session_stats(Config) ->
[N1, N2 | _] = ?config(cluster, Config),
%% pre-condition
true = emqx_persistent_message:is_persistence_enabled(),
true = ?ON(N1, emqx_persistent_message:is_persistence_enabled()),
Port1 = get_mqtt_port(N1, tcp),
Port2 = get_mqtt_port(N2, tcp),
NonPSClient = start_and_connect(#{
port => Port1,
clientid => <<"non-ps">>,
expiry_interval => 0
}),
PSClient = start_and_connect(#{
clientid => <<"ps">>,
PSClient1 = start_and_connect(#{
port => Port1,
clientid => <<"ps1">>,
expiry_interval => 30
}),
PSClient2 = start_and_connect(#{
port => Port2,
clientid => <<"ps2">>,
expiry_interval => 30
}),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"ps/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"ps/topic">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"common/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"common/topic">>, 2),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
@ -341,43 +385,66 @@ t_persistent_session_stats(_Config) ->
?retry(1_000, 10, begin
?assertMatch(
{ok, #{
<<"connections">> := 2,
<<"connections">> := 3,
<<"disconnected_durable_sessions">> := 0,
%% N.B.: we currently don't perform any deduplication between persistent
%% and non-persistent routes, so we count `commont/topic' twice and get 8
%% instead of 6 here.
<<"topics">> := 8,
<<"durable_subscriptions">> := 4,
<<"subscriptions">> := 4
<<"subscriptions">> := 8,
<<"subscriptions_ram">> := 4,
<<"subscriptions_durable">> := 4
}},
request(["monitor_current"])
?ON(N1, request(["monitor_current"]))
)
end),
%% Sanity checks
PSRouteCount = emqx_persistent_session_ds_router:stats(n_routes),
PSRouteCount = ?ON(N1, emqx_persistent_session_ds_router:stats(n_routes)),
?assert(PSRouteCount > 0, #{ps_route_count => PSRouteCount}),
PSSubCount = emqx_persistent_session_bookkeeper:get_subscription_count(),
PSSubCount = ?ON(N1, emqx_persistent_session_bookkeeper:get_subscription_count()),
?assert(PSSubCount > 0, #{ps_sub_count => PSSubCount}),
%% Now with disconnected but alive persistent sessions
{ok, {ok, _}} =
?wait_async_action(
emqtt:disconnect(PSClient),
emqtt:disconnect(PSClient1),
#{?snk_kind := dashboard_monitor_flushed}
),
?retry(1_000, 10, begin
?assertMatch(
{ok, #{
<<"connections">> := 1,
<<"connections">> := 3,
<<"disconnected_durable_sessions">> := 1,
%% N.B.: we currently don't perform any deduplication between persistent
%% and non-persistent routes, so we count `commont/topic' twice and get 8
%% instead of 6 here.
<<"topics">> := 8,
<<"durable_subscriptions">> := 4,
<<"subscriptions">> := 4
<<"subscriptions">> := 8,
<<"subscriptions_ram">> := 4,
<<"subscriptions_durable">> := 4
}},
request(["monitor_current"])
?ON(N1, request(["monitor_current"]))
)
end),
{ok, {ok, _}} =
?wait_async_action(
emqtt:disconnect(PSClient2),
#{?snk_kind := dashboard_monitor_flushed}
),
?retry(1_000, 10, begin
?assertMatch(
{ok, #{
<<"connections">> := 3,
<<"disconnected_durable_sessions">> := 2,
%% N.B.: we currently don't perform any deduplication between persistent
%% and non-persistent routes, so we count `commont/topic' twice and get 8
%% instead of 6 here.
<<"topics">> := 8,
<<"subscriptions">> := 8,
<<"subscriptions_ram">> := 4,
<<"subscriptions_durable">> := 4
}},
?ON(N1, request(["monitor_current"]))
)
end),
@ -453,15 +520,21 @@ waiting_emqx_stats_and_monitor_update(WaitKey) ->
ok.
start_and_connect(Opts) ->
Defaults = #{clean_start => false, expiry_interval => 30},
Defaults = #{
clean_start => false,
expiry_interval => 30,
port => 1883
},
#{
clientid := ClientId,
clean_start := CleanStart,
expiry_interval := EI
expiry_interval := EI,
port := Port
} = maps:merge(Defaults, Opts),
{ok, Client} = emqtt:start_link([
{clientid, ClientId},
{clean_start, CleanStart},
{port, Port},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => EI}}
]),
@ -470,3 +543,7 @@ start_and_connect(Opts) ->
end),
{ok, _} = emqtt:connect(Client),
Client.
get_mqtt_port(Node, Type) ->
{_IP, Port} = ?ON(Node, emqx_config:get([listeners, Type, default, bind])),
Port.

View File

@ -124,6 +124,8 @@ The following application environment variables are available:
- `emqx_durable_storage.egress_flush_interval`: period at which the batches of messages are committed to the durable storage.
- `emqx_durable_storage.reads`: `leader_preferred` | `local_preferred`.
Runtime settings for the durable storages can be modified via CLI as well as the REST API.
The following CLI commands are available:

View File

@ -33,6 +33,12 @@
]).
-export([which_dbs/0, which_shards/1]).
%% Debug:
-export([
get_egress_workers/1,
get_shard_workers/1
]).
%% behaviour callbacks:
-export([init/1]).
@ -73,11 +79,15 @@ start_shard({DB, Shard}) ->
start_egress({DB, Shard}) ->
supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)).
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok.
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, not_found}.
stop_shard({DB, Shard}) ->
Sup = ?via(#?shards_sup{db = DB}),
ok = supervisor:terminate_child(Sup, Shard),
ok = supervisor:delete_child(Sup, Shard).
case supervisor:terminate_child(Sup, Shard) of
ok ->
supervisor:delete_child(Sup, Shard);
{error, Reason} ->
{error, Reason}
end.
-spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}.
terminate_storage({DB, Shard}) ->
@ -111,6 +121,28 @@ which_dbs() ->
Key = {n, l, #?db_sup{_ = '_', db = '$1'}},
gproc:select({local, names}, [{{Key, '_', '_'}, [], ['$1']}]).
%% @doc Get pids of all local egress servers for the given DB.
-spec get_egress_workers(emqx_ds:db()) -> #{_Shard => pid()}.
get_egress_workers(DB) ->
Children = supervisor:which_children(?via(#?egress_sup{db = DB})),
L = [{Shard, Child} || {Shard, Child, _, _} <- Children, is_pid(Child)],
maps:from_list(L).
%% @doc Get pids of all local shard servers for the given DB.
-spec get_shard_workers(emqx_ds:db()) -> #{_Shard => pid()}.
get_shard_workers(DB) ->
Shards = supervisor:which_children(?via(#?shards_sup{db = DB})),
L = lists:flatmap(
fun
({_Shard, Sup, _, _}) when is_pid(Sup) ->
[{Id, Pid} || {Id, Pid, _, _} <- supervisor:which_children(Sup), is_pid(Pid)];
(_) ->
[]
end,
Shards
),
maps:from_list(L).
%%================================================================================
%% behaviour callbacks
%%================================================================================

View File

@ -561,12 +561,27 @@ list_nodes() ->
%% Too large for normal operation, need better backpressure mechanism.
-define(RA_TIMEOUT, 60 * 1000).
-define(SAFERPC(EXPR),
-define(SAFE_ERPC(EXPR),
try
EXPR
catch
error:RPCError = {erpc, _} ->
{error, recoverable, RPCError}
error:RPCError__ = {erpc, _} ->
{error, recoverable, RPCError__}
end
).
-define(SHARD_RPC(DB, SHARD, NODE, BODY),
case
emqx_ds_replication_layer_shard:servers(
DB, SHARD, application:get_env(emqx_durable_storage, reads, leader_preferred)
)
of
[{_, NODE} | _] ->
begin
BODY
end;
[] ->
{error, recoverable, replica_offline}
end
).
@ -623,44 +638,79 @@ ra_drop_generation(DB, Shard, GenId) ->
end.
ra_get_streams(DB, Shard, TopicFilter, Time) ->
{_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
TimestampUs = timestamp_to_timeus(Time),
?SAFERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs)).
?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs))
).
ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
?SAFERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time)).
?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time))
).
ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
TimeUs = timestamp_to_timeus(StartTime),
?SAFERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs))
).
ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
TimeUs = timestamp_to_timeus(StartTime),
?SAFERPC(emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(
emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)
)
).
ra_update_iterator(DB, Shard, Iter, DSKey) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
?SAFERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)).
?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey))
).
ra_next(DB, Shard, Iter, BatchSize) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of
RPCError = {badrpc, _} ->
{error, recoverable, RPCError};
Other ->
Other
end.
?SHARD_RPC(
DB,
Shard,
Node,
case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of
Err = {badrpc, _} ->
{error, recoverable, Err};
Ret ->
Ret
end
).
ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize).
?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize))
).
ra_list_generations_with_lifetimes(DB, Shard) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
case ?SAFERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)) of
Reply = ?SHARD_RPC(
DB,
Shard,
Node,
?SAFE_ERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard))
),
case Reply of
Gens = #{} ->
maps:map(
fun(_GenId, Data = #{since := Since, until := Until}) ->
@ -711,6 +761,14 @@ apply(
#{?tag := add_generation, ?since := Since},
#{db_shard := DBShard, latest := Latest0} = State0
) ->
?tp(
info,
ds_replication_layer_add_generation,
#{
shard => DBShard,
since => Since
}
),
{Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp),
State = State0#{latest := Latest},
@ -721,6 +779,15 @@ apply(
#{?tag := update_config, ?since := Since, ?config := Opts},
#{db_shard := DBShard, latest := Latest0} = State0
) ->
?tp(
notice,
ds_replication_layer_update_config,
#{
shard => DBShard,
config => Opts,
since => Since
}
),
{Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts),
State = State0#{latest := Latest},
@ -730,6 +797,14 @@ apply(
#{?tag := drop_generation, ?generation := GenId},
#{db_shard := DBShard} = State
) ->
?tp(
info,
ds_replication_layer_drop_generation,
#{
shard => DBShard,
generation => GenId
}
),
Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId),
{State, Result};
apply(

View File

@ -33,7 +33,7 @@
-export([start_link/2, store_batch/3]).
%% behavior callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
%% internal exports:
-export([]).
@ -129,6 +129,21 @@ init([DB, Shard]) ->
},
{ok, S}.
format_status(Status) ->
maps:map(
fun
(state, #s{db = DB, shard = Shard, queue = Q}) ->
#{
db => DB,
shard => Shard,
queue => queue:len(Q)
};
(_, Val) ->
Val
end,
Status
).
handle_call(
#enqueue_req{
messages = Msgs,

View File

@ -694,12 +694,12 @@ ensure_site() ->
forget_node(Node) ->
Sites = node_sites(Node),
Results = transaction(fun lists:map/2, [fun ?MODULE:forget_site_trans/1, Sites]),
case [Reason || {error, Reason} <- Results] of
[] ->
Result = transaction(fun lists:map/2, [fun ?MODULE:forget_site_trans/1, Sites]),
case Result of
Ok when is_list(Ok) ->
ok;
Errors ->
logger:error("Failed to forget leaving node ~p: ~p", [Node, Errors])
{error, Reason} ->
logger:error("Failed to forget leaving node ~p: ~p", [Node, Reason])
end.
%% @doc Returns sorted list of sites shards are replicated across.

View File

@ -28,8 +28,7 @@
%% Dynamic server location API
-export([
servers/3,
server/3
servers/3
]).
%% Membership
@ -83,16 +82,15 @@ server_name(DB, Shard, Site) ->
%%
-spec servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), Order) -> [server(), ...] when
Order :: leader_preferred | undefined.
servers(DB, Shard, _Order = leader_preferred) ->
-spec servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), Order) -> [server()] when
Order :: leader_preferred | local_preferred | undefined.
servers(DB, Shard, leader_preferred) ->
get_servers_leader_preferred(DB, Shard);
servers(DB, Shard, local_preferred) ->
get_servers_local_preferred(DB, Shard);
servers(DB, Shard, _Order = undefined) ->
get_shard_servers(DB, Shard).
server(DB, Shard, _Which = local_preferred) ->
get_server_local_preferred(DB, Shard).
get_servers_leader_preferred(DB, Shard) ->
%% NOTE: Contact last known leader first, then rest of shard servers.
ClusterName = get_cluster_name(DB, Shard),
@ -104,17 +102,24 @@ get_servers_leader_preferred(DB, Shard) ->
get_online_servers(DB, Shard)
end.
get_server_local_preferred(DB, Shard) ->
%% NOTE: Contact either local server or a random replica.
get_servers_local_preferred(DB, Shard) ->
%% Return list of servers, where the local replica (if exists) is
%% the first element. Note: result is _NOT_ shuffled. This can be
%% bad for the load balancing, but it makes results more
%% deterministic. Caller that doesn't care about that can shuffle
%% the results by itself.
ClusterName = get_cluster_name(DB, Shard),
case ra_leaderboard:lookup_members(ClusterName) of
Servers when is_list(Servers) ->
pick_local(Servers);
undefined ->
%% TODO
%% Leader is unkonwn if there are no servers of this group on the
%% local node. We want to pick a replica in that case as well.
pick_random(get_online_servers(DB, Shard))
Servers = get_online_servers(DB, Shard);
Servers when is_list(Servers) ->
ok
end,
case lists:keytake(node(), 2, Servers) of
false ->
Servers;
{value, Local, Rest} ->
[Local | Rest]
end.
lookup_leader(DB, Shard) ->
@ -139,17 +144,6 @@ filter_online(Servers) ->
is_server_online({_Name, Node}) ->
Node == node() orelse lists:member(Node, nodes()).
pick_local(Servers) ->
case lists:keyfind(node(), 2, Servers) of
Local when is_tuple(Local) ->
Local;
false ->
pick_random(Servers)
end.
pick_random(Servers) ->
lists:nth(rand:uniform(length(Servers)), Servers).
get_cluster_name(DB, Shard) ->
memoize(fun cluster_name/2, [DB, Shard]).

View File

@ -41,6 +41,7 @@
-define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}).
-define(ALLOCATE_RETRY_TIMEOUT, 1_000).
-define(TRIGGER_PENDING_TIMEOUT, 60_000).
-define(TRANS_RETRY_TIMEOUT, 5_000).
-define(CRASH_RETRY_DELAY, 20_000).
@ -106,7 +107,7 @@ handle_call(_Call, _From, State) ->
-spec handle_cast(_Cast, state()) -> {noreply, state()}.
handle_cast(#trigger_transitions{}, State) ->
{noreply, handle_pending_transitions(State)};
{noreply, handle_pending_transitions(State), ?TRIGGER_PENDING_TIMEOUT};
handle_cast(_Cast, State) ->
{noreply, State}.
@ -118,13 +119,15 @@ handle_cast(_Cast, State) ->
handle_info({timeout, _TRef, allocate}, State) ->
{noreply, handle_allocate_shards(State)};
handle_info({changed, {shard, DB, Shard}}, State = #{db := DB}) ->
{noreply, handle_shard_changed(Shard, State)};
{noreply, handle_shard_changed(Shard, State), ?TRIGGER_PENDING_TIMEOUT};
handle_info({changed, _}, State) ->
{noreply, State};
{noreply, State, ?TRIGGER_PENDING_TIMEOUT};
handle_info({'EXIT', Pid, Reason}, State) ->
{noreply, handle_exit(Pid, Reason, State)};
{noreply, handle_exit(Pid, Reason, State), ?TRIGGER_PENDING_TIMEOUT};
handle_info(timeout, State) ->
{noreply, handle_pending_transitions(State), ?TRIGGER_PENDING_TIMEOUT};
handle_info(_Info, State) ->
{noreply, State}.
{noreply, State, ?TRIGGER_PENDING_TIMEOUT}.
-spec terminate(_Reason, state()) -> _Ok.
terminate(_Reason, State = #{db := DB, shards := Shards}) ->
@ -229,6 +232,7 @@ handle_transition(DB, Shard, Trans, Handler) ->
domain => [emqx, ds, DB, shard_transition]
}),
?tp(
debug,
dsrepl_shard_transition_begin,
#{shard => Shard, db => DB, transition => Trans, pid => self()}
),
@ -240,7 +244,12 @@ apply_handler(Fun, DB, Shard, Trans) ->
erlang:apply(Fun, [DB, Shard, Trans]).
trans_add_local(DB, Shard, {add, Site}) ->
logger:info(#{msg => "Adding new local shard replica", site => Site}),
logger:info(#{
msg => "Adding new local shard replica",
site => Site,
db => DB,
shard => Shard
}),
do_add_local(membership, DB, Shard).
do_add_local(membership = Stage, DB, Shard) ->
@ -251,6 +260,8 @@ do_add_local(membership = Stage, DB, Shard) ->
{error, recoverable, Reason} ->
logger:warning(#{
msg => "Shard membership change failed",
db => DB,
shard => Shard,
reason => Reason,
retry_in => ?TRANS_RETRY_TIMEOUT
}),
@ -261,10 +272,12 @@ do_add_local(readiness = Stage, DB, Shard) ->
LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard),
case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of
ready ->
logger:info(#{msg => "Local shard replica ready"});
logger:info(#{msg => "Local shard replica ready", db => DB, shard => Shard});
Status ->
logger:warning(#{
msg => "Still waiting for local shard replica to be ready",
db => DB,
shard => Shard,
status => Status,
retry_in => ?TRANS_RETRY_TIMEOUT
}),
@ -273,7 +286,12 @@ do_add_local(readiness = Stage, DB, Shard) ->
end.
trans_drop_local(DB, Shard, {del, Site}) ->
logger:info(#{msg => "Dropping local shard replica", site => Site}),
logger:info(#{
msg => "Dropping local shard replica",
site => Site,
db => DB,
shard => Shard
}),
do_drop_local(DB, Shard).
do_drop_local(DB, Shard) ->
@ -293,17 +311,24 @@ do_drop_local(DB, Shard) ->
end.
trans_rm_unresponsive(DB, Shard, {del, Site}) ->
logger:info(#{msg => "Removing unresponsive shard replica", site => Site}),
logger:info(#{
msg => "Removing unresponsive shard replica",
site => Site,
db => DB,
shard => Shard
}),
do_rm_unresponsive(DB, Shard, Site).
do_rm_unresponsive(DB, Shard, Site) ->
Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site),
case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of
ok ->
logger:info(#{msg => "Unresponsive shard replica removed"});
logger:info(#{msg => "Unresponsive shard replica removed", db => DB, shard => Shard});
{error, recoverable, Reason} ->
logger:warning(#{
msg => "Shard membership change failed",
db => DB,
shard => Shard,
reason => Reason,
retry_in => ?TRANS_RETRY_TIMEOUT
}),
@ -341,6 +366,7 @@ handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) ->
case maps:to_list(maps:filter(fun(_, TH) -> TH#transhdl.pid == Pid end, Ts)) of
[{Track, #transhdl{shard = Shard, trans = Trans}}] ->
?tp(
debug,
dsrepl_shard_transition_end,
#{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason}
),
@ -361,9 +387,10 @@ handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) ->
State;
handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) ->
State;
handle_transition_exit(Shard, Trans, Reason, State) ->
handle_transition_exit(Shard, Trans, Reason, State = #{db := DB}) ->
logger:warning(#{
msg => "Shard membership transition failed",
db => DB,
shard => Shard,
transition => Trans,
reason => Reason,

View File

@ -35,7 +35,7 @@
make_iterator/5,
make_delete_iterator/5,
update_iterator/4,
next/5,
next/6,
delete_next/6,
post_creation_actions/1,
@ -161,7 +161,7 @@
%% GVar used for idle detection:
-define(IDLE_DETECT, idle_detect).
-define(EPOCH(S, TS), (TS bsl S#s.ts_bits)).
-define(EPOCH(S, TS), (TS bsr S#s.ts_offset)).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@ -424,23 +424,21 @@ next(
Schema = #s{ts_offset = TSOffset, ts_bits = TSBits},
It = #{?storage_key := Stream},
BatchSize,
Now
Now,
IsCurrent
) ->
init_counters(),
%% Compute safe cutoff time. It's the point in time where the last
%% complete epoch ends, so we need to know the current time to
%% compute it. This is needed because new keys can be added before
%% the iterator.
IsWildcard =
%%
%% This is needed to avoid situations when the iterator advances
%% to position k1, and then a new message with k2, such that k2 <
%% k1 is inserted. k2 would be missed.
HasCutoff =
case Stream of
{_StaticKey, []} -> false;
_ -> true
end,
SafeCutoffTime =
case IsWildcard of
true ->
(Now bsr TSOffset) bsl TSOffset;
false ->
{_StaticKey, []} ->
%% Iterators scanning streams without varying topic
%% levels can operate on incomplete epochs, since new
%% matching keys for the single topic are added in
@ -450,10 +448,27 @@ next(
%% filters operating on streams with varying parts:
%% iterator can jump to the next topic and then it
%% won't backtrack.
false;
_ ->
%% New batches are only added to the current
%% generation. We can ignore cutoff time for old
%% generations:
IsCurrent
end,
SafeCutoffTime =
case HasCutoff of
true ->
?EPOCH(Schema, Now) bsl TSOffset;
false ->
1 bsl TSBits - 1
end,
try
next_until(Schema, It, SafeCutoffTime, BatchSize)
case next_until(Schema, It, SafeCutoffTime, BatchSize) of
{ok, _, []} when not IsCurrent ->
{ok, end_of_stream};
Result ->
Result
end
after
report_counters(Shard)
end.
@ -538,6 +553,17 @@ delete_next_until(
end.
handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
%% If the last message was published more than one epoch ago, and
%% the shard remains idle, we need to advance safety cutoff
%% interval to make sure the last epoch becomes visible to the
%% readers.
%%
%% We do so by emitting a dummy event that will be persisted by
%% the replication layer. Processing it will advance the
%% replication layer's clock.
%%
%% This operation is latched to avoid publishing events on every
%% tick.
case ets:lookup(Gvars, ?IDLE_DETECT) of
[{?IDLE_DETECT, Latch, LastWrittenTs}] ->
ok;
@ -546,13 +572,17 @@ handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
LastWrittenTs = 0
end,
case Latch of
false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) ->
false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) + 1 ->
%% Note: + 1 above delays the event by one epoch to add a
%% safety margin.
ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}),
[dummy_event];
_ ->
[]
end;
handle_event(_ShardId, _Data, _Time, _Event) ->
%% `dummy_event' goes here and does nothing. But it forces update
%% of `Time' in the replication layer.
[].
%%================================================================================

View File

@ -52,7 +52,7 @@
]).
%% gen_server
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
%% internal exports:
-export([db_dir/1]).
@ -80,6 +80,10 @@
-define(stream_v2(GENERATION, INNER), [GENERATION | INNER]).
-define(delete_stream(GENERATION, INNER), [GENERATION | INNER]).
%% Wrappers for the storage events:
-define(storage_event(GEN_ID, PAYLOAD), #{0 := 3333, 1 := GEN_ID, 2 := PAYLOAD}).
-define(mk_storage_event(GEN_ID, PAYLOAD), #{0 => 3333, 1 => GEN_ID, 2 => PAYLOAD}).
%%================================================================================
%% Type declarations
%%================================================================================
@ -244,8 +248,8 @@
) ->
emqx_ds:make_delete_iterator_result(_Iterator).
-callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time()) ->
{ok, Iter, [emqx_types:message()]} | {error, _}.
-callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()) ->
{ok, Iter, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}.
-callback delete_next(
shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time()
@ -297,25 +301,30 @@ store_batch(Shard, Messages, Options) ->
[{emqx_ds:time(), emqx_types:message()}],
emqx_ds:message_store_opts()
) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_).
prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
%% NOTE
%% We assume that batches do not span generations. Callers should enforce this.
?tp(emqx_ds_storage_layer_prepare_batch, #{
shard => Shard, messages => Messages, options => Options
}),
{GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time),
T0 = erlang:monotonic_time(microsecond),
Result =
case Mod:prepare_batch(Shard, GenData, Messages, Options) of
{ok, CookedBatch} ->
{ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
Error = {error, _, _} ->
Error
end,
T1 = erlang:monotonic_time(microsecond),
%% TODO store->prepare
emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
Result;
%% FIXME: always store messages in the current generation
case generation_at(Shard, Time) of
{GenId, #{module := Mod, data := GenData}} ->
T0 = erlang:monotonic_time(microsecond),
Result =
case Mod:prepare_batch(Shard, GenData, Messages, Options) of
{ok, CookedBatch} ->
{ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
Error = {error, _, _} ->
Error
end,
T1 = erlang:monotonic_time(microsecond),
%% TODO store->prepare
emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
Result;
not_found ->
ignore
end;
prepare_batch(_Shard, [], _Options) ->
ignore.
@ -444,15 +453,12 @@ update_iterator(
next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize, Now) ->
case generation_get(Shard, GenId) of
#{module := Mod, data := GenData} ->
Current = generation_current(Shard),
case Mod:next(Shard, GenData, GenIter0, BatchSize, Now) of
{ok, _GenIter, []} when GenId < Current ->
%% This is a past generation. Storage layer won't write
%% any more messages here. The iterator reached the end:
%% the stream has been fully replayed.
{ok, end_of_stream};
IsCurrent = GenId =:= generation_current(Shard),
case Mod:next(Shard, GenData, GenIter0, BatchSize, Now, IsCurrent) of
{ok, GenIter, Batch} ->
{ok, Iter#{?enc := GenIter}, Batch};
{ok, end_of_stream} ->
{ok, end_of_stream};
Error = {error, _, _} ->
Error
end;
@ -513,7 +519,7 @@ add_generation(ShardId, Since) ->
list_generations_with_lifetimes(ShardId) ->
gen_server:call(?REF(ShardId), #call_list_generations_with_lifetimes{}, infinity).
-spec drop_generation(shard_id(), gen_id()) -> ok.
-spec drop_generation(shard_id(), gen_id()) -> ok | {error, _}.
drop_generation(ShardId, GenId) ->
gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity).
@ -563,6 +569,7 @@ start_link(Shard = {_, _}, Options) ->
init({ShardId, Options}) ->
process_flag(trap_exit, true),
?tp(info, ds_storage_init, #{shard => ShardId}),
logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}),
erase_schema_runtime(ShardId),
clear_all_checkpoints(ShardId),
@ -586,6 +593,17 @@ init({ShardId, Options}) ->
commit_metadata(S),
{ok, S}.
format_status(Status) ->
maps:map(
fun
(state, State) ->
format_state(State);
(_, Val) ->
Val
end,
Status
).
handle_call(#call_update_config{since = Since, options = Options}, _From, S0) ->
case handle_update_config(S0, Since, Options) of
S = #s{} ->
@ -758,18 +776,31 @@ handle_drop_generation(S0, GenId) ->
} = S0,
#{module := Mod, cf_refs := GenCFRefs} = GenSchema,
#{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard,
case Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) of
ok ->
CFRefs = OldCFRefs -- GenCFRefs,
Shard = maps:remove(?GEN_KEY(GenId), OldShard),
Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
S = S0#s{
cf_refs = CFRefs,
shard = Shard,
schema = Schema
},
{ok, S}
end.
try
Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData)
catch
EC:Err:Stack ->
?tp(
error,
ds_storage_layer_failed_to_drop_generation,
#{
shard => ShardId,
EC => Err,
stacktrace => Stack,
generation => GenId,
s => format_state(S0)
}
)
end,
CFRefs = OldCFRefs -- GenCFRefs,
Shard = maps:remove(?GEN_KEY(GenId), OldShard),
Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
S = S0#s{
cf_refs = CFRefs,
shard = Shard,
schema = Schema
},
{ok, S}.
-spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->
generation().
@ -815,10 +846,6 @@ new_generation(ShardId, DB, Schema0, Since) ->
next_generation_id(GenId) ->
GenId + 1.
-spec prev_generation_id(gen_id()) -> gen_id().
prev_generation_id(GenId) when GenId > 0 ->
GenId - 1.
%% @doc Commit current state of the server to both rocksdb and the persistent term
-spec commit_metadata(server_state()) -> ok.
commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB}) ->
@ -914,23 +941,23 @@ handle_accept_snapshot(ShardId) ->
Dir = db_dir(ShardId),
emqx_ds_storage_snapshot:new_writer(Dir).
%% FIXME: currently this interface is a hack to handle safe cutoff
%% timestamp in LTS. It has many shortcomings (can lead to infinite
%% loops if the CBM is not careful; events from one generation may be
%% sent to the next one, etc.) and the API is not well thought out in
%% general.
%%
%% The mechanism of storage layer events should be refined later.
-spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent].
-spec handle_event(shard_id(), emqx_ds:time(), Event) -> [Event].
handle_event(Shard, Time, ?storage_event(GenId, Event)) ->
case generation_get(Shard, GenId) of
not_found ->
[];
#{module := Mod, data := GenData} ->
case erlang:function_exported(Mod, handle_event, 4) of
true ->
NewEvents = Mod:handle_event(Shard, GenData, Time, Event),
[?mk_storage_event(GenId, E) || E <- NewEvents];
false ->
[]
end
end;
handle_event(Shard, Time, Event) ->
{_GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time),
?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}),
case erlang:function_exported(Mod, handle_event, 4) of
true ->
Mod:handle_event(Shard, GenData, Time, Event);
false ->
[]
end.
GenId = generation_current(Shard),
handle_event(Shard, Time, ?mk_storage_event(GenId, Event)).
%%--------------------------------------------------------------------------------
%% Schema access
@ -941,6 +968,25 @@ generation_current(Shard) ->
#{current_generation := Current} = get_schema_runtime(Shard),
Current.
%% TODO: remove me
-spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()} | not_found.
generation_at(Shard, Time) ->
Schema = #{current_generation := Current} = get_schema_runtime(Shard),
generation_at(Time, Current, Schema).
generation_at(Time, GenId, Schema) ->
case Schema of
#{?GEN_KEY(GenId) := Gen} ->
case Gen of
#{since := Since} when Time < Since andalso GenId > 0 ->
generation_at(Time, GenId - 1, Schema);
_ ->
{GenId, Gen}
end;
_ ->
not_found
end.
-spec generation_get(shard_id(), gen_id()) -> generation() | not_found.
generation_get(Shard, GenId) ->
case get_schema_runtime(Shard) of
@ -964,19 +1010,23 @@ generations_since(Shard, Since) ->
Schema
).
-spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()}.
generation_at(Shard, Time) ->
Schema = #{current_generation := Current} = get_schema_runtime(Shard),
generation_at(Time, Current, Schema).
generation_at(Time, GenId, Schema) ->
#{?GEN_KEY(GenId) := Gen} = Schema,
case Gen of
#{since := Since} when Time < Since andalso GenId > 0 ->
generation_at(Time, prev_generation_id(GenId), Schema);
_ ->
{GenId, Gen}
end.
format_state(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) ->
#{
id => ShardId,
db => DB,
cf_refs => CFRefs,
schema => Schema,
shard =>
maps:map(
fun
(?GEN_KEY(_), _Schema) ->
'...';
(_K, Val) ->
Val
end,
Shard
)
}.
-define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).

View File

@ -38,7 +38,7 @@
make_iterator/5,
make_delete_iterator/5,
update_iterator/4,
next/5,
next/6,
delete_next/6
]).
@ -148,7 +148,7 @@ update_iterator(_Shard, _Data, OldIter, DSKey) ->
last_seen_message_key = DSKey
}}.
next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) ->
next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now, IsCurrent) ->
#it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0,
{ok, ITHandle} = rocksdb:iterator(DB, CF, []),
Action =
@ -162,7 +162,12 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) ->
{Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []),
rocksdb:iterator_close(ITHandle),
It = It0#it{last_seen_message_key = Key},
{ok, It, lists:reverse(Messages)}.
case Messages of
[] when not IsCurrent ->
{ok, end_of_stream};
_ ->
{ok, It, lists:reverse(Messages)}
end.
delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) ->
#delete_it{

View File

@ -179,8 +179,7 @@ make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
| {ok, end_of_stream}
| {error, _}.
delete_next(Node, DB, Shard, Iter, Selector, BatchSize) ->
emqx_rpc:call(
Shard,
erpc:call(
Node,
emqx_ds_replication_layer,
do_delete_next_v4,

View File

@ -67,10 +67,16 @@ t_00_smoke_open_drop(_Config) ->
%% A simple smoke test that verifies that storing the messages doesn't
%% crash
t_01_smoke_store(_Config) ->
DB = default,
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
Msg = message(<<"foo/bar">>, <<"foo">>, 0),
?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])).
?check_trace(
#{timetrap => 10_000},
begin
DB = default,
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
Msg = message(<<"foo/bar">>, <<"foo">>, 0),
?assertMatch(ok, emqx_ds:store_batch(DB, [Msg]))
end,
[]
).
%% A simple smoke test that verifies that getting the list of streams
%% doesn't crash and that iterators can be opened.

View File

@ -183,7 +183,7 @@ t_rebalance(Config) ->
],
Stream1 = emqx_utils_stream:interleave(
[
{50, Stream0},
{10, Stream0},
emqx_utils_stream:const(add_generation)
],
false
@ -479,11 +479,13 @@ t_rebalance_offline_restarts(Config) ->
%%
shard_server_info(Node, DB, Shard, Site, Info) ->
Server = shard_server(Node, DB, Shard, Site),
{Server, ds_repl_shard(Node, server_info, [Info, Server])}.
shard_server(Node, DB, Shard, Site) ->
ds_repl_shard(Node, shard_server, [DB, Shard, Site]).
?ON(
Node,
begin
Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site),
{Server, emqx_ds_replication_layer_shard:server_info(Info, Server)}
end
).
ds_repl_meta(Node, Fun) ->
ds_repl_meta(Node, Fun, []).
@ -499,9 +501,6 @@ ds_repl_meta(Node, Fun, Args) ->
error(meta_op_failed)
end.
ds_repl_shard(Node, Fun, Args) ->
erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args).
shards(Node, DB) ->
erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).

View File

@ -27,25 +27,6 @@ opts() ->
%%
t_idempotent_store_batch(_Config) ->
Shard = {?FUNCTION_NAME, _ShardId = <<"42">>},
{ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
%% Push some messages to the shard.
Msgs1 = [gen_message(N) || N <- lists:seq(10, 20)],
GenTs = 30,
Msgs2 = [gen_message(N) || N <- lists:seq(40, 50)],
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})),
%% Add new generation and push the same batch + some more.
?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, GenTs)),
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})),
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs2), #{})),
%% First batch should have been handled idempotently.
?assertEqual(
Msgs1 ++ Msgs2,
lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#']))
),
ok = stop_shard(Pid).
t_snapshot_take_restore(_Config) ->
Shard = {?FUNCTION_NAME, _ShardId = <<"42">>},
{ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
@ -77,7 +58,7 @@ t_snapshot_take_restore(_Config) ->
%% Verify that the restored shard contains the messages up until the snapshot.
{ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
?assertEqual(
snabbkaffe_diff:assert_lists_eq(
Msgs1 ++ Msgs2,
lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#']))
).

View File

@ -21,6 +21,8 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_plugins/include/emqx_plugins.hrl").
-dialyzer({no_match, [format_plugin_avsc_and_i18n/1]}).
-export([
api_spec/0,
fields/1,
@ -178,6 +180,9 @@ schema("/plugins/:name/config") ->
responses => #{
%% avro data, json encoded
200 => hoconsc:mk(binary()),
400 => emqx_dashboard_swagger:error_codes(
['BAD_CONFIG'], <<"Plugin Config Not Found">>
),
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
}
},
@ -488,13 +493,13 @@ update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
plugin_config(get, #{bindings := #{name := NameVsn}}) ->
case emqx_plugins:describe(NameVsn) of
{ok, _} ->
case emqx_plugins:get_config(NameVsn) of
{ok, AvroJson} ->
case emqx_plugins:get_config(NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found) of
{ok, AvroJson} when is_map(AvroJson) ->
{200, #{<<"content-type">> => <<"'application/json'">>}, AvroJson};
{error, _} ->
{ok, ?plugin_conf_not_found} ->
{400, #{
code => 'BAD_CONFIG',
message => <<"Failed to get plugin config">>
message => <<"Plugin Config Not Found">>
}}
end;
_ ->
@ -503,7 +508,7 @@ plugin_config(get, #{bindings := #{name := NameVsn}}) ->
plugin_config(put, #{bindings := #{name := NameVsn}, body := AvroJsonMap}) ->
case emqx_plugins:describe(NameVsn) of
{ok, _} ->
case emqx_plugins:decode_plugin_avro_config(NameVsn, AvroJsonMap) of
case emqx_plugins:decode_plugin_config_map(NameVsn, AvroJsonMap) of
{ok, AvroValueConfig} ->
Nodes = emqx:running_nodes(),
%% cluster call with config in map (binary key-value)
@ -534,7 +539,7 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
{error, Reason} ->
{400, #{code => 'BAD_POSITION', message => Reason}};
Position ->
case emqx_plugins:ensure_enabled(Name, Position, _ConfLocation = global) of
case emqx_plugins:ensure_enabled(Name, Position, global) of
ok ->
{204};
{error, Reason} ->
@ -599,9 +604,9 @@ ensure_action(Name, restart) ->
ok.
%% for RPC plugin avro encoded config update
do_update_plugin_config(Name, AvroJsonMap, PluginConfigMap) ->
do_update_plugin_config(NameVsn, AvroJsonMap, PluginConfigMap) ->
%% TODO: maybe use `PluginConfigMap` to validate config
emqx_plugins:put_config(Name, AvroJsonMap, PluginConfigMap).
emqx_plugins:put_config(NameVsn, AvroJsonMap, PluginConfigMap).
%%--------------------------------------------------------------------
%% Helper functions
@ -694,10 +699,11 @@ aggregate_status([{Node, Plugins} | List], Acc) ->
),
aggregate_status(List, NewAcc).
-if(?EMQX_RELEASE_EDITION == ee).
format_plugin_avsc_and_i18n(NameVsn) ->
#{
avsc => try_read_file(fun() -> emqx_plugins:plugin_avsc(NameVsn) end),
i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n(NameVsn) end)
avsc => try_read_file(fun() -> emqx_plugins:plugin_schema_json(NameVsn) end),
i18n => try_read_file(fun() -> emqx_plugins:plugin_i18n_json(NameVsn) end)
}.
try_read_file(Fun) ->
@ -706,6 +712,11 @@ try_read_file(Fun) ->
_ -> null
end.
-else.
format_plugin_avsc_and_i18n(_NameVsn) ->
#{avsc => null, i18n => null}.
-endif.
% running_status: running loaded, stopped
%% config_status: not_configured disable enable
plugin_status(#{running_status := running}) -> running;

View File

@ -29,11 +29,18 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite(),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite().
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(Case, Config) ->
?MODULE:Case({init, Config}).

View File

@ -33,7 +33,6 @@ init_per_suite(Config) ->
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _Api} = emqx_common_test_http:create_default_app(),
[{apps, Apps} | Config].
end_per_suite(Config) ->

View File

@ -51,11 +51,18 @@ groups() ->
].
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite(),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite().
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)).
init_per_group(api_status_endpoint, Config) ->
[{get_status_path, ["api", "v5", "status"]} | Config];

View File

@ -25,12 +25,19 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]),
Apps = emqx_cth_suite:start(
[
emqx_conf,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
ok = emqx_mgmt_cli:load(),
Config.
[{apps, Apps} | Config].
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(t_autocluster_leave = TC, Config) ->
[Core1, Core2, Repl1, Repl2] =

View File

@ -21,15 +21,27 @@
-define(PLUGIN_SERDE_TAB, emqx_plugins_schema_serde_tab).
-define(CONFIG_FORMAT_AVRO, config_format_avro).
-define(CONFIG_FORMAT_BIN, config_format_bin).
-define(CONFIG_FORMAT_MAP, config_format_map).
-define(plugin_conf_not_found, plugin_conf_not_found).
-type schema_name() :: binary().
-type avsc_path() :: string().
-type encoded_data() :: iodata().
-type decoded_data() :: map().
%% "my_plugin-0.1.0"
-type name_vsn() :: binary() | string().
%% the parse result of the JSON info file
-type plugin_info() :: map().
-type schema_json_map() :: map().
-type i18n_json_map() :: map().
-type raw_plugin_config_content() :: binary().
-type plugin_config_map() :: map().
-type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
-record(plugin_schema_serde, {
name :: schema_name(),
eval_context :: term(),

View File

@ -1,8 +0,0 @@
%% -*- mode: erlang -*-
{"0.1.0",
[ {<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -16,8 +16,11 @@
-module(emqx_plugins).
-include_lib("emqx/include/logger.hrl").
-feature(maybe_expr, enable).
-include("emqx_plugins.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@ -25,15 +28,16 @@
-export([
describe/1,
plugin_avsc/1,
plugin_i18n/1,
plugin_avro/1,
plugin_schema_json/1,
plugin_i18n_json/1,
raw_plugin_config_content/1,
parse_name_vsn/1,
make_name_vsn_string/2
]).
%% Package operations
-export([
ensure_installed/0,
ensure_installed/1,
ensure_uninstalled/1,
ensure_enabled/1,
@ -65,9 +69,10 @@
%% Package utils
-export([
decode_plugin_avro_config/2,
decode_plugin_config_map/2,
install_dir/0,
avsc_file_path/1
avsc_file_path/1,
with_plugin_avsc/1
]).
%% `emqx_config_handler' API
@ -79,7 +84,10 @@
-export([get_tar/1]).
%% Internal export
-export([do_ensure_started/1]).
-export([
ensure_config_map/1,
do_ensure_started/1
]).
%% for test cases
-export([put_config_internal/2]).
@ -96,36 +104,26 @@
-define(MAX_KEEP_BACKUP_CONFIGS, 10).
%% "my_plugin-0.1.0"
-type name_vsn() :: binary() | string().
%% the parse result of the JSON info file
-type plugin() :: map().
-type schema_json() :: map().
-type i18n_json() :: map().
-type avro_binary() :: binary().
-type plugin_config() :: map().
-type position() :: no_move | front | rear | {before, name_vsn()} | {behind, name_vsn()}.
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
%% @doc Describe a plugin.
-spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
-spec describe(name_vsn()) -> {ok, plugin_info()} | {error, any()}.
describe(NameVsn) ->
read_plugin_info(NameVsn, #{fill_readme => true}).
-spec plugin_avsc(name_vsn()) -> {ok, schema_json()} | {error, any()}.
plugin_avsc(NameVsn) ->
-spec plugin_schema_json(name_vsn()) -> {ok, schema_json_map()} | {error, any()}.
plugin_schema_json(NameVsn) ->
read_plugin_avsc(NameVsn).
-spec plugin_i18n(name_vsn()) -> {ok, i18n_json()} | {error, any()}.
plugin_i18n(NameVsn) ->
-spec plugin_i18n_json(name_vsn()) -> {ok, i18n_json_map()} | {error, any()}.
plugin_i18n_json(NameVsn) ->
read_plugin_i18n(NameVsn).
-spec plugin_avro(name_vsn()) -> {ok, avro_binary()} | {error, any()}.
plugin_avro(NameVsn) ->
read_plugin_avro(NameVsn).
-spec raw_plugin_config_content(name_vsn()) -> {ok, raw_plugin_config_content()} | {error, any()}.
raw_plugin_config_content(NameVsn) ->
read_plugin_hocon(NameVsn).
parse_name_vsn(NameVsn) when is_binary(NameVsn) ->
parse_name_vsn(binary_to_list(NameVsn));
@ -141,15 +139,32 @@ make_name_vsn_string(Name, Vsn) ->
%%--------------------------------------------------------------------
%% Package operations
%% @doc Start all configured plugins are started.
-spec ensure_installed() -> ok.
ensure_installed() ->
Fun = fun(#{name_vsn := NameVsn}) ->
case ensure_installed(NameVsn) of
ok -> [];
{error, Reason} -> [{NameVsn, Reason}]
end
end,
ok = for_plugins(Fun).
%% @doc Install a .tar.gz package placed in install_dir.
-spec ensure_installed(name_vsn()) -> ok | {error, map()}.
ensure_installed(NameVsn) ->
case read_plugin_info(NameVsn, #{}) of
{ok, _} ->
ok;
ok,
_ = maybe_ensure_plugin_config(NameVsn);
{error, _} ->
ok = purge(NameVsn),
do_ensure_installed(NameVsn)
case ensure_exists_and_installed(NameVsn) of
ok ->
maybe_post_op_after_installed(NameVsn);
{error, _Reason} = Err ->
Err
end
end.
%% @doc Ensure files and directories for the given plugin are being deleted.
@ -230,7 +245,17 @@ delete_package(NameVsn) ->
%% @doc Start all configured plugins are started.
-spec ensure_started() -> ok.
ensure_started() ->
ok = for_plugins(fun ?MODULE:do_ensure_started/1).
Fun = fun
(#{name_vsn := NameVsn, enable := true}) ->
case do_ensure_started(NameVsn) of
ok -> [];
{error, Reason} -> [{NameVsn, Reason}]
end;
(#{name_vsn := NameVsn, enable := false}) ->
?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}),
[]
end,
ok = for_plugins(Fun).
%% @doc Start a plugin from Management API or CLI.
%% the input is a <name>-<vsn> string.
@ -247,7 +272,17 @@ ensure_started(NameVsn) ->
%% @doc Stop all plugins before broker stops.
-spec ensure_stopped() -> ok.
ensure_stopped() ->
for_plugins(fun ?MODULE:ensure_stopped/1).
Fun = fun
(#{name_vsn := NameVsn, enable := true}) ->
case ensure_stopped(NameVsn) of
ok -> [];
{error, Reason} -> [{NameVsn, Reason}]
end;
(#{name_vsn := NameVsn, enable := false}) ->
?SLOG(debug, #{msg => "plugin_disabled", action => stop_plugin, name_vsn => NameVsn}),
[]
end,
ok = for_plugins(Fun).
%% @doc Stop a plugin from Management API or CLI.
-spec ensure_stopped(name_vsn()) -> ok | {error, term()}.
@ -260,37 +295,48 @@ ensure_stopped(NameVsn) ->
end
).
get_config(Name, Vsn, Options, Default) ->
get_config(make_name_vsn_string(Name, Vsn), Options, Default).
get_config(Name, Vsn, Opt, Default) ->
get_config(make_name_vsn_string(Name, Vsn), Opt, Default).
-spec get_config(name_vsn()) ->
{ok, plugin_config()}
{ok, plugin_config_map() | any()}
| {error, term()}.
get_config(NameVsn) ->
get_config(bin(NameVsn), #{format => ?CONFIG_FORMAT_MAP}).
get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{}).
-spec get_config(name_vsn(), Options :: map()) ->
{ok, avro_binary() | plugin_config()}
-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP | ?CONFIG_FORMAT_BIN) ->
{ok, raw_plugin_config_content() | plugin_config_map() | any()}
| {error, term()}.
get_config(NameVsn, #{format := ?CONFIG_FORMAT_AVRO}) ->
%% no default value when get raw binary config
case read_plugin_avro(NameVsn) of
{ok, _AvroJson} = Res -> Res;
{error, _Reason} = Err -> Err
end;
get_config(NameVsn, Options = #{format := ?CONFIG_FORMAT_MAP}) ->
get_config(NameVsn, Options, #{}).
get_config(NameVsn, ?CONFIG_FORMAT_MAP) ->
get_config(NameVsn, ?CONFIG_FORMAT_MAP, #{});
get_config(NameVsn, ?CONFIG_FORMAT_BIN) ->
get_config_bin(NameVsn).
get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) ->
{ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), Default)}.
%% Present default config value only in map format.
-spec get_config(name_vsn(), ?CONFIG_FORMAT_MAP, any()) ->
{ok, plugin_config_map() | any()}
| {error, term()}.
get_config(NameVsn, ?CONFIG_FORMAT_MAP, Default) ->
{ok, persistent_term:get(?PLUGIN_PERSIS_CONFIG_KEY(bin(NameVsn)), Default)}.
get_config_bin(NameVsn) ->
%% no default value when get raw binary config
case read_plugin_hocon(NameVsn) of
{ok, _Map} = Res -> Res;
{error, _Reason} = Err -> Err
end.
%% @doc Update plugin's config.
%% RPC call from Management API or CLI.
%% the avro Json Map and plugin config ALWAYS be valid before calling this function.
put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) ->
AvroJsonBin = emqx_utils_json:encode(AvroJsonMap),
ok = backup_and_write_avro_bin(NameVsn, AvroJsonBin),
ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap),
%% the plugin config Json Map and plugin config ALWAYS be valid before calling this function.
put_config(NameVsn, ConfigJsonMap, DecodedPluginConfig) when not is_binary(NameVsn) ->
put_config(bin(NameVsn), ConfigJsonMap, DecodedPluginConfig);
put_config(NameVsn, ConfigJsonMap, _DecodedPluginConfig) ->
HoconBin = hocon_pp:do(ConfigJsonMap, #{}),
ok = backup_and_write_hocon_bin(NameVsn, HoconBin),
%% TODO: callback in plugin's on_config_changed (config update by mgmt API)
%% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2)
ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), ConfigJsonMap),
ok.
%% @doc Stop and then start the plugin.
@ -302,7 +348,7 @@ restart(NameVsn) ->
%% @doc List all installed plugins.
%% Including the ones that are installed, but not enabled in config.
-spec list() -> [plugin()].
-spec list() -> [plugin_info()].
list() ->
Pattern = filename:join([install_dir(), "*", "release.json"]),
All = lists:filtermap(
@ -323,15 +369,24 @@ list() ->
%%--------------------------------------------------------------------
%% Package utils
-spec decode_plugin_avro_config(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}.
decode_plugin_avro_config(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
decode_plugin_avro_config(NameVsn, emqx_utils_json:encode(AvroJsonMap));
decode_plugin_avro_config(NameVsn, AvroJsonBin) ->
-spec decode_plugin_config_map(name_vsn(), map() | binary()) -> {ok, map()} | {error, any()}.
decode_plugin_config_map(NameVsn, AvroJsonMap) when is_map(AvroJsonMap) ->
decode_plugin_config_map(NameVsn, emqx_utils_json:encode(AvroJsonMap));
decode_plugin_config_map(NameVsn, AvroJsonBin) ->
case emqx_plugins_serde:decode(NameVsn, AvroJsonBin) of
{ok, Config} -> {ok, Config};
{error, ReasonMap} -> {error, ReasonMap}
end.
-spec with_plugin_avsc(name_vsn()) -> boolean().
with_plugin_avsc(NameVsn) ->
case read_plugin_info(NameVsn, #{fill_readme => false}) of
{ok, #{<<"with_config_schema">> := WithAvsc}} when is_boolean(WithAvsc) ->
WithAvsc;
_ ->
false
end.
get_config_interal(Key, Default) when is_atom(Key) ->
get_config_interal([Key], Default);
get_config_interal(Path, Default) ->
@ -438,7 +493,6 @@ do_ensure_installed(NameVsn) ->
ok = write_tar_file_content(install_dir(), TarContent),
case read_plugin_info(NameVsn, #{}) of
{ok, _} ->
ok = maybe_post_op_after_install(NameVsn),
ok;
{error, Reason} ->
?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
@ -603,7 +657,11 @@ tryit(WhichOp, F) ->
exception => Reason,
stacktrace => Stacktrace
}),
{error, {failed, WhichOp}}
{error, #{
which_op => WhichOp,
exception => Reason,
stacktrace => Stacktrace
}}
end.
%% read plugin info from the JSON file
@ -611,16 +669,16 @@ tryit(WhichOp, F) ->
read_plugin_info(NameVsn, Options) ->
tryit(
atom_to_list(?FUNCTION_NAME),
fun() -> {ok, do_read_plugin2(NameVsn, Options)} end
fun() -> {ok, do_read_plugin(NameVsn, Options)} end
).
do_read_plugin(NameVsn) ->
do_read_plugin2(NameVsn, #{}).
do_read_plugin(NameVsn, #{}).
do_read_plugin2(NameVsn, Option) ->
do_read_plugin3(NameVsn, info_file_path(NameVsn), Option).
do_read_plugin(NameVsn, Option) ->
do_read_plugin(NameVsn, info_file_path(NameVsn), Option).
do_read_plugin3(NameVsn, InfoFilePath, Options) ->
do_read_plugin(NameVsn, InfoFilePath, Options) ->
{ok, PlainMap} = (read_file_fun(InfoFilePath, "bad_info_file", #{read_mode => ?JSON_MAP}))(),
Info0 = check_plugin(PlainMap, NameVsn, InfoFilePath),
Info1 = plugins_readme(NameVsn, Options, Info0),
@ -642,12 +700,12 @@ read_plugin_i18n(NameVsn, Options) ->
read_file_fun(i18n_file_path(NameVsn), "bad_i18n_file", Options)
).
read_plugin_avro(NameVsn) ->
read_plugin_avro(NameVsn, #{read_mode => ?RAW_BIN}).
read_plugin_avro(NameVsn, Options) ->
read_plugin_hocon(NameVsn) ->
read_plugin_hocon(NameVsn, #{read_mode => ?RAW_BIN}).
read_plugin_hocon(NameVsn, Options) ->
tryit(
atom_to_list(?FUNCTION_NAME),
read_file_fun(avro_config_file(NameVsn), "bad_avro_file", Options)
read_file_fun(plugin_config_file(NameVsn), "bad_hocon_file", Options)
).
ensure_exists_and_installed(NameVsn) ->
@ -659,7 +717,7 @@ ensure_exists_and_installed(NameVsn) ->
case get_tar(NameVsn) of
{ok, TarContent} ->
ok = file:write_file(pkg_file_path(NameVsn), TarContent),
ok = do_ensure_installed(NameVsn);
do_ensure_installed(NameVsn);
_ ->
%% If not, try to get it from the cluster.
do_get_from_cluster(NameVsn)
@ -668,33 +726,51 @@ ensure_exists_and_installed(NameVsn) ->
do_get_from_cluster(NameVsn) ->
Nodes = [N || N <- mria:running_nodes(), N /= node()],
case get_from_any_node(Nodes, NameVsn, []) of
case get_plugin_tar_from_any_node(Nodes, NameVsn, []) of
{ok, TarContent} ->
ok = file:write_file(pkg_file_path(NameVsn), TarContent),
ok = do_ensure_installed(NameVsn);
{error, NodeErrors} when Nodes =/= [] ->
?SLOG(error, #{
msg => "failed_to_copy_plugin_from_other_nodes",
ErrMeta = #{
error_msg => "failed_to_copy_plugin_from_other_nodes",
name_vsn => NameVsn,
node_errors => NodeErrors
}),
{error, plugin_not_found};
node_errors => NodeErrors,
reason => not_found
},
?SLOG(error, ErrMeta),
{error, ErrMeta};
{error, _} ->
?SLOG(error, #{
msg => "no_nodes_to_copy_plugin_from",
name_vsn => NameVsn
}),
{error, plugin_not_found}
ErrMeta = #{
error_msg => "no_nodes_to_copy_plugin_from",
name_vsn => NameVsn,
reason => not_found
},
?SLOG(error, ErrMeta),
{error, ErrMeta}
end.
get_from_any_node([], _NameVsn, Errors) ->
get_plugin_tar_from_any_node([], _NameVsn, Errors) ->
{error, Errors};
get_from_any_node([Node | T], NameVsn, Errors) ->
get_plugin_tar_from_any_node([Node | T], NameVsn, Errors) ->
case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of
{ok, _} = Res ->
Res;
Err ->
get_from_any_node(T, NameVsn, [{Node, Err} | Errors])
get_plugin_tar_from_any_node(T, NameVsn, [{Node, Err} | Errors])
end.
get_plugin_config_from_any_node([], _NameVsn, Errors) ->
{error, Errors};
get_plugin_config_from_any_node([Node | T], NameVsn, Errors) ->
case
emqx_plugins_proto_v2:get_config(
Node, NameVsn, ?CONFIG_FORMAT_MAP, ?plugin_conf_not_found, 5_000
)
of
{ok, _} = Res ->
Res;
Err ->
get_plugin_config_from_any_node(T, NameVsn, [{Node, Err} | Errors])
end.
plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
@ -1011,29 +1087,31 @@ configured() ->
get_config_interal(states, []).
for_plugins(ActionFun) ->
case lists:flatmap(fun(I) -> for_plugin(I, ActionFun) end, configured()) of
[] -> ok;
Errors -> erlang:error(#{function => ActionFun, errors => Errors})
case lists:flatmap(ActionFun, configured()) of
[] ->
ok;
Errors ->
ErrMeta = #{function => ActionFun, errors => Errors},
?tp(
for_plugins_action_error_occurred,
ErrMeta
),
?SLOG(error, ErrMeta),
ok
end.
for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
case Fun(NameVsn) of
ok -> [];
{error, Reason} -> [{NameVsn, Reason}]
end;
for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}),
[].
maybe_post_op_after_install(NameVsn) ->
maybe_post_op_after_installed(NameVsn) ->
_ = maybe_load_config_schema(NameVsn),
_ = maybe_create_config_dir(NameVsn),
_ = ensure_state(NameVsn, no_move, false, global),
ok.
maybe_load_config_schema(NameVsn) ->
AvscPath = avsc_file_path(NameVsn),
filelib:is_regular(AvscPath) andalso
do_load_config_schema(NameVsn, AvscPath).
_ =
with_plugin_avsc(NameVsn) andalso
filelib:is_regular(AvscPath) andalso
do_load_config_schema(NameVsn, AvscPath),
_ = maybe_create_config_dir(NameVsn).
do_load_config_schema(NameVsn, AvscPath) ->
case emqx_plugins_serde:add_schema(NameVsn, AvscPath) of
@ -1043,28 +1121,107 @@ do_load_config_schema(NameVsn, AvscPath) ->
end.
maybe_create_config_dir(NameVsn) ->
ConfigDir = plugin_config_dir(NameVsn),
case filelib:ensure_path(ConfigDir) of
ok ->
ok;
with_plugin_avsc(NameVsn) andalso
do_create_config_dir(NameVsn).
do_create_config_dir(NameVsn) ->
case plugin_config_dir(NameVsn) of
{error, Reason} ->
?SLOG(warning, #{
msg => "failed_to_create_plugin_config_dir",
dir => ConfigDir,
reason => Reason
}),
{error, {mkdir_failed, ConfigDir, Reason}}
{error, {gen_config_dir_failed, Reason}};
ConfigDir ->
case filelib:ensure_path(ConfigDir) of
ok ->
%% get config from other nodes or get from tarball
_ = maybe_ensure_plugin_config(NameVsn),
ok;
{error, Reason} ->
?SLOG(warning, #{
msg => "failed_to_create_plugin_config_dir",
dir => ConfigDir,
reason => Reason
}),
{error, {mkdir_failed, ConfigDir, Reason}}
end
end.
maybe_ensure_plugin_config(NameVsn) ->
maybe
true ?= with_plugin_avsc(NameVsn),
_ = ensure_plugin_config(NameVsn)
else
_ -> ok
end.
ensure_plugin_config(NameVsn) ->
%% fetch plugin hocon config from cluster
Nodes = [N || N <- mria:running_nodes(), N /= node()],
ensure_plugin_config(NameVsn, Nodes).
ensure_plugin_config(NameVsn, []) ->
?SLOG(debug, #{
msg => "default_plugin_config_used",
name_vsn => NameVsn,
reason => "no_other_running_nodes"
}),
cp_default_config_file(NameVsn);
ensure_plugin_config(NameVsn, Nodes) ->
case get_plugin_config_from_any_node(Nodes, NameVsn, []) of
{ok, ConfigMap} when is_map(ConfigMap) ->
HoconBin = hocon_pp:do(ConfigMap, #{}),
ok = file:write_file(plugin_config_file(NameVsn), HoconBin),
ensure_config_map(NameVsn);
_ ->
?SLOG(error, #{msg => "config_not_found_from_cluster", name_vsn => NameVsn}),
%% otherwise cp default hocon file
%% i.e. Clean installation
cp_default_config_file(NameVsn)
end.
cp_default_config_file(NameVsn) ->
%% always copy default hocon file into config dir when can not get config from other nodes
Source = default_plugin_config_file(NameVsn),
Destination = plugin_config_file(NameVsn),
maybe
true ?= filelib:is_regular(Source),
%% destination path not existed (not configured)
true ?= (not filelib:is_regular(Destination)),
case file:copy(Source, Destination) of
{ok, _} ->
ok;
{error, Reason} ->
?SLOG(warning, #{
msg => "failed_to_copy_plugin_default_hocon_config",
source => Source,
destination => Destination,
reason => Reason
})
end
else
_ -> ensure_config_map(NameVsn)
end.
ensure_config_map(NameVsn) ->
with_plugin_avsc(NameVsn) andalso
do_ensure_config_map(NameVsn).
do_ensure_config_map(NameVsn) ->
case read_plugin_hocon(NameVsn, #{read_mode => ?JSON_MAP}) of
{ok, ConfigJsonMap} ->
{ok, Config} = decode_plugin_config_map(NameVsn, ConfigJsonMap),
put_config(NameVsn, ConfigJsonMap, Config);
_ ->
?SLOG(warning, #{msg => "failed_to_read_plugin_config_hocon", name_vsn => NameVsn}),
ok
end.
%% @private Backup the current config to a file with a timestamp suffix and
%% then save the new config to the config file.
backup_and_write_avro_bin(NameVsn, AvroBin) ->
backup_and_write_hocon_bin(NameVsn, HoconBin) ->
%% this may fail, but we don't care
%% e.g. read-only file system
Path = avro_config_file(NameVsn),
Path = plugin_config_file(NameVsn),
_ = filelib:ensure_dir(Path),
TmpFile = Path ++ ".tmp",
case file:write_file(TmpFile, AvroBin) of
case file:write_file(TmpFile, HoconBin) of
ok ->
backup_and_replace(Path, TmpFile);
{error, Reason} ->
@ -1146,9 +1303,29 @@ read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) ->
plugin_dir(NameVsn) ->
wrap_list_path(filename:join([install_dir(), NameVsn])).
-spec plugin_config_dir(name_vsn()) -> string().
-spec plugin_priv_dir(name_vsn()) -> string().
plugin_priv_dir(NameVsn) ->
case read_plugin_info(NameVsn, #{fill_readme => false}) of
{ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} ->
AppDir = make_name_vsn_string(Name, Vsn),
wrap_list_path(filename:join([plugin_dir(NameVsn), AppDir, "priv"]));
_ ->
wrap_list_path(filename:join([install_dir(), NameVsn, "priv"]))
end.
-spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}.
plugin_config_dir(NameVsn) ->
wrap_list_path(filename:join([plugin_dir(NameVsn), "data", "configs"])).
case parse_name_vsn(NameVsn) of
{ok, NameAtom, _Vsn} ->
wrap_list_path(filename:join([emqx:data_dir(), "plugins", atom_to_list(NameAtom)]));
{error, Reason} ->
?SLOG(warning, #{
msg => "failed_to_generate_plugin_config_dir_for_plugin",
plugin_namevsn => NameVsn,
reason => Reason
}),
{error, Reason}
end.
%% Files
-spec pkg_file_path(name_vsn()) -> string().
@ -1161,15 +1338,20 @@ info_file_path(NameVsn) ->
-spec avsc_file_path(name_vsn()) -> string().
avsc_file_path(NameVsn) ->
wrap_list_path(filename:join([plugin_dir(NameVsn), "config_schema.avsc"])).
wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config_schema.avsc"])).
-spec avro_config_file(name_vsn()) -> string().
avro_config_file(NameVsn) ->
wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.avro"])).
-spec plugin_config_file(name_vsn()) -> string().
plugin_config_file(NameVsn) ->
wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.hocon"])).
%% should only used when plugin installing
-spec default_plugin_config_file(name_vsn()) -> string().
default_plugin_config_file(NameVsn) ->
wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config.hocon"])).
-spec i18n_file_path(name_vsn()) -> string().
i18n_file_path(NameVsn) ->
wrap_list_path(filename:join([plugin_dir(NameVsn), "config_i18n.json"])).
wrap_list_path(filename:join([plugin_priv_dir(NameVsn), "config_i18n.json"])).
-spec readme_file(name_vsn()) -> string().
readme_file(NameVsn) ->

View File

@ -27,8 +27,9 @@
start(_Type, _Args) ->
%% load all pre-configured
ok = emqx_plugins:ensure_started(),
{ok, Sup} = emqx_plugins_sup:start_link(),
ok = emqx_plugins:ensure_installed(),
ok = emqx_plugins:ensure_started(),
ok = emqx_config_handler:add_handler([?CONF_ROOT], emqx_plugins),
{ok, Sup}.

View File

@ -33,7 +33,6 @@
init/1,
handle_call/3,
handle_cast/2,
handle_continue/2,
terminate/2
]).
@ -126,11 +125,10 @@ init(_) ->
]),
State = #{},
AvscPaths = get_plugin_avscs(),
{ok, State, {continue, {build_serdes, AvscPaths}}}.
handle_continue({build_serdes, AvscPaths}, State) ->
%% force build all schemas at startup
%% otherwise plugin schema may not be available when needed
_ = build_serdes(AvscPaths),
{noreply, State}.
{ok, State}.
handle_call({build_serdes, NameVsn, AvscPath}, _From, State) ->
BuildRes = do_build_serde({NameVsn, AvscPath}),
@ -153,10 +151,10 @@ terminate(_Reason, _State) ->
-spec get_plugin_avscs() -> [{string(), string()}].
get_plugin_avscs() ->
Pattern = filename:join([emqx_plugins:install_dir(), "*", "config_schema.avsc"]),
Pattern = filename:join([emqx_plugins:install_dir(), "*", "*", "priv", "config_schema.avsc"]),
lists:foldl(
fun(AvscPath, AccIn) ->
[_, NameVsn | _] = lists:reverse(filename:split(AvscPath)),
[_, _, _, NameVsn | _] = lists:reverse(filename:split(AvscPath)),
[{to_bin(NameVsn), AvscPath} | AccIn]
end,
_Acc0 = [],

View File

@ -0,0 +1,41 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_plugins_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
get_tar/3,
get_config/5
]).
-include("emqx_plugins.hrl").
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.7.0".
-spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any()}.
get_tar(Node, NameVsn, Timeout) ->
rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout).
-spec get_config(
node(), name_vsn(), ?CONFIG_FORMAT_MAP, any(), timeout()
) -> {ok, map() | any()} | {error, any()}.
get_config(Node, NameVsn, Opt, Default, Timeout) ->
rpc:call(Node, emqx_plugins, get_config, [NameVsn, Opt, Default], Timeout).

View File

@ -21,6 +21,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(EMQX_PLUGIN_APP_NAME, my_emqx_plugin).
-define(EMQX_PLUGIN_TEMPLATE_RELEASE_NAME, atom_to_list(?EMQX_PLUGIN_APP_NAME)).
@ -273,9 +274,15 @@ t_start_restart_and_stop(Config) ->
%% fake enable bar-2
ok = ensure_state(Bar2, rear, true),
%% should cause an error
?assertError(
#{function := _, errors := [_ | _]},
emqx_plugins:ensure_started()
?check_trace(
emqx_plugins:ensure_started(),
fun(Trace) ->
?assertMatch(
[#{function := _, errors := [_ | _]}],
?of_kind(for_plugins_action_error_occurred, Trace)
),
ok
end
),
%% but demo plugin should still be running
assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
@ -337,7 +344,7 @@ t_enable_disable({'end', Config}) ->
t_enable_disable(Config) ->
NameVsn = proplists:get_value(name_vsn, Config),
ok = emqx_plugins:ensure_installed(NameVsn),
?assertEqual([], emqx_plugins:configured()),
?assertEqual([#{name_vsn => NameVsn, enable => false}], emqx_plugins:configured()),
ok = emqx_plugins:ensure_enabled(NameVsn),
?assertEqual([#{name_vsn => NameVsn, enable => true}], emqx_plugins:configured()),
ok = emqx_plugins:ensure_disabled(NameVsn),
@ -379,9 +386,10 @@ t_bad_tar_gz(Config) ->
}},
emqx_plugins:ensure_installed("fake-vsn")
),
%% the plugin tarball can not be found on any nodes
?assertMatch(
{error, #{
error_msg := "failed_to_extract_plugin_package",
error_msg := "no_nodes_to_copy_plugin_from",
reason := not_found
}},
emqx_plugins:ensure_installed("nonexisting")
@ -556,7 +564,7 @@ t_load_config_from_cli({'end', Config}) ->
t_load_config_from_cli(Config) when is_list(Config) ->
NameVsn = ?config(name_vsn, Config),
ok = emqx_plugins:ensure_installed(NameVsn),
?assertEqual([], emqx_plugins:configured()),
?assertEqual([#{name_vsn => NameVsn, enable => false}], emqx_plugins:configured()),
ok = emqx_plugins:ensure_enabled(NameVsn),
ok = emqx_plugins:ensure_started(NameVsn),
Params0 = unused,
@ -687,6 +695,14 @@ group_t_copy_plugin_to_a_new_node(Config) ->
%% see: emqx_conf_app:init_conf/0
ok = rpc:call(CopyToNode, application, stop, [emqx_plugins]),
{ok, _} = rpc:call(CopyToNode, application, ensure_all_started, [emqx_plugins]),
%% Plugin config should be synced from `CopyFromNode`
%% by application `emqx` and `emqx_conf`
%% FIXME: in test case, we manually do it here
ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [[states], CopyFromPluginsState]),
ok = rpc:call(CopyToNode, emqx_plugins, ensure_installed, []),
ok = rpc:call(CopyToNode, emqx_plugins, ensure_started, []),
?assertMatch(
{ok, #{running_status := running, config_status := enabled}},
rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn])
@ -739,6 +755,16 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) ->
ct:pal("~p install_dir:\n ~p", [
CopyToNode, erpc:call(CopyToNode, file, list_dir, [ToInstallDir])
]),
%% Plugin config should be synced from `CopyFromNode`
%% by application `emqx` and `emqx_conf`
%% FIXME: in test case, we manually do it here
ok = rpc:call(CopyToNode, emqx_plugins, put_config_internal, [
[states], [#{enable => true, name_vsn => NameVsn}]
]),
ok = rpc:call(CopyToNode, emqx_plugins, ensure_installed, []),
ok = rpc:call(CopyToNode, emqx_plugins, ensure_started, []),
?assertMatch(
{ok, #{running_status := running, config_status := enabled}},
rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn])
@ -785,6 +811,11 @@ group_t_cluster_leave(Config) ->
ok = erpc:call(N1, emqx_plugins, ensure_installed, [NameVsn]),
ok = erpc:call(N1, emqx_plugins, ensure_started, [NameVsn]),
ok = erpc:call(N1, emqx_plugins, ensure_enabled, [NameVsn]),
ok = erpc:call(N2, emqx_plugins, ensure_installed, [NameVsn]),
ok = erpc:call(N2, emqx_plugins, ensure_started, [NameVsn]),
ok = erpc:call(N2, emqx_plugins, ensure_enabled, [NameVsn]),
Params = unused,
%% 2 nodes running
?assertMatch(

View File

@ -294,19 +294,19 @@ fetch_cluster_consistented_data() ->
}.
aggre_or_zip_init_acc() ->
#{
stats_data => maps:from_keys(metrics_name(stats_metric_meta()), []),
vm_data => maps:from_keys(metrics_name(vm_metric_meta()), []),
cluster_data => maps:from_keys(metrics_name(cluster_metric_meta()), []),
emqx_packet_data => maps:from_keys(metrics_name(emqx_packet_metric_meta()), []),
emqx_message_data => maps:from_keys(metrics_name(message_metric_meta()), []),
emqx_delivery_data => maps:from_keys(metrics_name(delivery_metric_meta()), []),
emqx_client_data => maps:from_keys(metrics_name(client_metric_meta()), []),
emqx_session_data => maps:from_keys(metrics_name(session_metric_meta()), []),
emqx_olp_data => maps:from_keys(metrics_name(olp_metric_meta()), []),
emqx_acl_data => maps:from_keys(metrics_name(acl_metric_meta()), []),
emqx_authn_data => maps:from_keys(metrics_name(authn_metric_meta()), []),
mria_data => maps:from_keys(metrics_name(mria_metric_meta()), [])
(maybe_add_ds_meta())#{
stats_data => meta_to_init_from(stats_metric_meta()),
vm_data => meta_to_init_from(vm_metric_meta()),
cluster_data => meta_to_init_from(cluster_metric_meta()),
emqx_packet_data => meta_to_init_from(emqx_packet_metric_meta()),
emqx_message_data => meta_to_init_from(message_metric_meta()),
emqx_delivery_data => meta_to_init_from(delivery_metric_meta()),
emqx_client_data => meta_to_init_from(client_metric_meta()),
emqx_session_data => meta_to_init_from(session_metric_meta()),
emqx_olp_data => meta_to_init_from(olp_metric_meta()),
emqx_acl_data => meta_to_init_from(acl_metric_meta()),
emqx_authn_data => meta_to_init_from(authn_metric_meta()),
mria_data => meta_to_init_from(mria_metric_meta())
}.
logic_sum_metrics() ->
@ -656,6 +656,18 @@ emqx_metric_data(MetricNameTypeKeyL, Mode) ->
MetricNameTypeKeyL
).
%%==========
%% Durable Storage
maybe_add_ds_meta() ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
#{
ds_data => meta_to_init_from(emqx_ds_builtin_metrics:prometheus_meta())
};
false ->
#{}
end.
%%==========
%% Bytes && Packets
emqx_packet_metric_meta() ->
@ -1116,6 +1128,9 @@ zip_json_prom_stats_metrics(Key, Points, AllResultedAcc) ->
ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult).
meta_to_init_from(Meta) ->
maps:from_keys(metrics_name(Meta), []).
metrics_name(MetricsAll) ->
[Name || {Name, _, _} <- MetricsAll].

View File

@ -542,9 +542,9 @@ handle_event(enter, _OldState, ?state_stopped = State, Data) ->
{keep_state_and_data, []};
%% The following events can be handled in any other state
handle_event(
{call, From}, {add_channel, ChannelId, _Config}, State, Data
{call, From}, {add_channel, ChannelId, Config}, State, Data
) ->
handle_not_connected_add_channel(From, ChannelId, State, Data);
handle_not_connected_add_channel(From, ChannelId, Config, State, Data);
handle_event(
{call, From}, {remove_channel, ChannelId}, _State, Data
) ->
@ -678,8 +678,8 @@ add_channels(Data) ->
Channels = Data#data.added_channels,
NewChannels = lists:foldl(
fun
({ChannelID, #{enable := true}}, Acc) ->
maps:put(ChannelID, channel_status(), Acc);
({ChannelID, #{enable := true} = Config}, Acc) ->
maps:put(ChannelID, channel_status_not_added(Config), Acc);
({_, #{enable := false}}, Acc) ->
Acc
end,
@ -702,7 +702,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
%% we have not yet performed the initial health_check
NewAddedChannelsMap = maps:put(
ChannelID,
channel_status_new_waiting_for_health_check(),
channel_status_new_waiting_for_health_check(ChannelConfig),
AddedChannelsMap
),
NewData = Data#data{
@ -720,7 +720,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap = maps:put(
ChannelID,
channel_status(Error),
channel_status(Error, ChannelConfig),
AddedChannelsMap
),
NewData = Data#data{
@ -835,7 +835,7 @@ handle_add_channel(From, Data, ChannelId, Config) ->
maps:get(
ChannelId,
Channels,
channel_status()
channel_status_not_added(Config)
)
)
of
@ -843,7 +843,7 @@ handle_add_channel(From, Data, ChannelId, Config) ->
%% The channel is not installed in the connector state
%% We insert it into the channels map and let the health check
%% take care of the rest
NewChannels = maps:put(ChannelId, channel_status_new_with_config(Config), Channels),
NewChannels = maps:put(ChannelId, channel_status_not_added(Config), Channels),
NewData = Data#data{added_channels = NewChannels},
{keep_state, update_state(NewData, Data), [
{reply, From, ok}
@ -854,17 +854,21 @@ handle_add_channel(From, Data, ChannelId, Config) ->
{keep_state_and_data, [{reply, From, ok}]}
end.
handle_not_connected_add_channel(From, ChannelId, State, Data) ->
handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) ->
%% When state is not connected the channel will be added to the channels
%% map but nothing else will happen.
NewData = add_channel_status_if_not_exists(Data, ChannelId, State),
NewData = add_or_update_channel_status(Data, ChannelId, ChannelConfig, State),
{keep_state, update_state(NewData, Data), [{reply, From, ok}]}.
handle_remove_channel(From, ChannelId, Data) ->
Channels = Data#data.added_channels,
%% Deactivate alarm
_ = maybe_clear_alarm(ChannelId),
case channel_status_is_channel_added(maps:get(ChannelId, Channels, channel_status())) of
case
channel_status_is_channel_added(
maps:get(ChannelId, Channels, channel_status_not_added(undefined))
)
of
false ->
%% The channel is already not installed in the connector state.
%% We still need to remove it from the added_channels map
@ -1033,7 +1037,10 @@ continue_resource_health_check_not_connected(NewStatus, Data0) ->
end.
handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
{keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]};
{keep_state_and_data, [
{reply, From,
maps:remove(config, channel_status({error, resource_disconnected}, undefined))}
]};
handle_manual_channel_health_check(
From,
#data{
@ -1066,13 +1073,15 @@ handle_manual_channel_health_check(
is_map_key(ChannelId, Channels)
->
%% No ongoing health check: reply with current status.
{keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]};
{keep_state_and_data, [{reply, From, maps:remove(config, maps:get(ChannelId, Channels))}]};
handle_manual_channel_health_check(
From,
_Data,
_ChannelId
) ->
{keep_state_and_data, [{reply, From, channel_status({error, channel_not_found})}]}.
{keep_state_and_data, [
{reply, From, maps:remove(config, channel_status({error, channel_not_found}, undefined))}
]}.
-spec channels_health_check(resource_status(), data()) -> data().
channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
@ -1097,14 +1106,14 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
%% 2. Raise alarms (TODO: if it is a probe we should not raise alarms)
Channels = Data0#data.added_channels,
ChannelsToChangeStatusFor = [
ChannelId
|| {ChannelId, Status} <- maps:to_list(Channels),
{ChannelId, Config}
|| {ChannelId, #{config := Config} = Status} <- maps:to_list(Channels),
channel_status_is_channel_added(Status)
],
ChannelsWithNewStatuses =
[
{ChannelId, channel_status({?status_connecting, resource_is_connecting})}
|| ChannelId <- ChannelsToChangeStatusFor
{ChannelId, channel_status({?status_connecting, resource_is_connecting}, Config)}
|| {ChannelId, Config} <- ChannelsToChangeStatusFor
],
%% Update the channels map
NewChannels = lists:foldl(
@ -1149,9 +1158,10 @@ channels_health_check(ConnectorStatus, Data0) ->
ConnectorStatus,
ChannelId,
Data1
)}
)},
Config
)}
|| {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels)
|| {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels)
],
%% Raise alarms
_ = lists:foreach(
@ -1218,14 +1228,29 @@ trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0})
start_channel_health_check(Data1, ChannelId)
end.
-spec continue_channel_health_check_connected(channel_id(), channel_status_map(), data()) -> data().
continue_channel_health_check_connected(ChannelId, OldStatus, Data0) ->
-spec continue_channel_health_check_connected(
channel_id(), channel_status_map(), channel_status_map(), data()
) -> data().
continue_channel_health_check_connected(ChannelId, OldStatus, CurrentStatus, Data0) ->
#data{hc_workers = HCWorkers0} = Data0,
#{channel := CHCWorkers0} = HCWorkers0,
CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0),
Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
%% Remove the added channels with a a status different from connected or connecting
NewStatus = maps:get(ChannelId, Data0#data.added_channels),
case OldStatus =:= CurrentStatus of
true ->
continue_channel_health_check_connected_no_update_during_check(
ChannelId, OldStatus, Data1
);
false ->
%% Channel has been updated while the health check process was working so
%% we should not clear any alarm or remove the channel from the
%% connector
Data1
end.
continue_channel_health_check_connected_no_update_during_check(ChannelId, OldStatus, Data1) ->
%% Remove the added channels with a status different from connected or connecting
NewStatus = maps:get(ChannelId, Data1#data.added_channels),
ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)],
Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
%% Raise/clear alarms
@ -1253,9 +1278,11 @@ spawn_channel_health_check_worker(#data{} = Data, ChannelId) ->
%% separated so it can be spec'ed and placate dialyzer tantrums...
-spec worker_channel_health_check(data(), channel_id()) -> no_return().
worker_channel_health_check(Data, ChannelId) ->
#data{id = ResId, mod = Mod, state = State} = Data,
#data{id = ResId, mod = Mod, state = State, added_channels = Channels} = Data,
ChannelStatus = maps:get(ChannelId, Channels, #{}),
ChannelConfig = maps:get(config, ChannelStatus, undefined),
RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State),
exit({ok, channel_status(RawStatus)}).
exit({ok, channel_status(RawStatus, ChannelConfig)}).
-spec handle_channel_health_check_worker_down(
data(), {pid(), reference()}, {ok, channel_status_map()}
@ -1267,11 +1294,15 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
added_channels = AddedChannels0
} = Data0,
{ChannelId, CHCWorkers1} = maps:take(WorkerRef, CHCWorkers0),
case ExitResult of
{ok, NewStatus} ->
%% `emqx_resource:call_channel_health_check' catches all exceptions.
AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0)
end,
%% The channel might have got removed while the health check was going on
CurrentStatus = maps:get(ChannelId, AddedChannels0, channel_not_added),
{AddedChannels, NewStatus} =
handle_channel_health_check_worker_down_new_channels_and_status(
ChannelId,
ExitResult,
CurrentStatus,
AddedChannels0
),
#{ongoing := Ongoing0} = CHCWorkers1,
{PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0),
CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1},
@ -1283,19 +1314,52 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
CHCWorkers = CHCWorkers3#{pending := Rest},
HCWorkers = HCWorkers0#{channel := CHCWorkers},
Data3 = Data2#data{hc_workers = HCWorkers},
Data4 = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3),
Data4 = continue_channel_health_check_connected(
ChannelId,
PreviousChanStatus,
CurrentStatus,
Data3
),
Data = start_channel_health_check(Data4, NextChannelId),
{keep_state, update_state(Data, Data0), Replies};
#{pending := []} ->
HCWorkers = HCWorkers0#{channel := CHCWorkers3},
Data3 = Data2#data{hc_workers = HCWorkers},
Data = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3),
Data = continue_channel_health_check_connected(
ChannelId,
PreviousChanStatus,
CurrentStatus,
Data3
),
{keep_state, update_state(Data, Data0), Replies}
end.
reply_pending_channel_health_check_callers(
ChannelId, Status, Data0 = #data{hc_pending_callers = Pending0}
handle_channel_health_check_worker_down_new_channels_and_status(
ChannelId,
{ok, #{config := CheckedConfig} = NewStatus} = _ExitResult,
#{config := CurrentConfig} = _CurrentStatus,
AddedChannels
) when CheckedConfig =:= CurrentConfig ->
%% Checked config is the same as the current config so we can update the
%% status in AddedChannels
{maps:put(ChannelId, NewStatus, AddedChannels), NewStatus};
handle_channel_health_check_worker_down_new_channels_and_status(
_ChannelId,
{ok, NewStatus} = _ExitResult,
_CurrentStatus,
AddedChannels
) ->
%% The checked config is different from the current config which means we
%% should not update AddedChannels because the channel has been removed or
%% updated while the health check was in progress. We can still reply with
%% NewStatus because the health check must have been issued before the
%% configuration changed or the channel got removed.
{AddedChannels, NewStatus}.
reply_pending_channel_health_check_callers(
ChannelId, Status0, Data0 = #data{hc_pending_callers = Pending0}
) ->
Status = maps:remove(config, Status0),
#{channel := CPending0} = Pending0,
Pending = maps:get(ChannelId, CPending0, []),
Actions = [{reply, From, Status} || From <- Pending],
@ -1367,9 +1431,13 @@ maybe_alarm(_Status, _ResId, Error, Error) ->
maybe_alarm(_Status, ResId, Error, _PrevError) ->
HrError =
case Error of
{error, undefined} -> <<"Unknown reason">>;
{error, Reason} -> emqx_utils:readable_error_msg(Reason);
_ -> emqx_utils:readable_error_msg(Error)
{error, undefined} ->
<<"Unknown reason">>;
{error, Reason} ->
emqx_utils:readable_error_msg(Reason);
_ ->
Error1 = redact_config_from_error_status(Error),
emqx_utils:readable_error_msg(Error1)
end,
emqx_alarm:safe_activate(
ResId,
@ -1378,6 +1446,11 @@ maybe_alarm(_Status, ResId, Error, _PrevError) ->
),
?tp(resource_activate_alarm, #{resource_id => ResId}).
redact_config_from_error_status(#{config := _} = ErrorStatus) ->
maps:remove(config, ErrorStatus);
redact_config_from_error_status(Error) ->
Error.
-spec maybe_resume_resource_workers(resource_id(), resource_status()) -> ok.
maybe_resume_resource_workers(ResId, ?status_connected) ->
lists:foreach(
@ -1426,6 +1499,11 @@ maybe_reply(Actions, From, Reply) ->
-spec data_record_to_external_map(data()) -> resource_data().
data_record_to_external_map(Data) ->
AddedChannelsWithoutConfigs =
maps:map(
fun(_ChanID, Status) -> maps:remove(config, Status) end,
Data#data.added_channels
),
#{
id => Data#data.id,
error => external_error(Data#data.error),
@ -1435,7 +1513,7 @@ data_record_to_external_map(Data) ->
config => Data#data.config,
status => Data#data.status,
state => Data#data.state,
added_channels => Data#data.added_channels
added_channels => AddedChannelsWithoutConfigs
}.
-spec wait_for_ready(resource_id(), integer()) -> ok | timeout | {error, term()}.
@ -1469,7 +1547,7 @@ safe_call(ResId, Message, Timeout) ->
%% Helper functions for chanel status data
channel_status() ->
channel_status_not_added(ChannelConfig) ->
#{
%% The status of the channel. Can be one of the following:
%% - disconnected: the channel is not added to the resource (error may contain the reason))
@ -1479,62 +1557,61 @@ channel_status() ->
%% - connected: the channel is added to the resource, the resource is
%% connected and the on_channel_get_status callback has returned
%% connected. The error field should be undefined.
status => ?status_disconnected,
error => not_added_yet
}.
%% If the channel is added with add_channel/2, the config field will be set to
%% the config. This is useful when doing probing since the config is not stored
%% anywhere else in that case.
channel_status_new_with_config(Config) ->
#{
status => ?status_disconnected,
error => not_added_yet,
config => Config
config => ChannelConfig
}.
channel_status_new_waiting_for_health_check() ->
channel_status_new_waiting_for_health_check(ChannelConfig) ->
#{
status => ?status_connecting,
error => no_health_check_yet
error => no_health_check_yet,
config => ChannelConfig
}.
channel_status({?status_connecting, Error}) ->
channel_status({?status_connecting, Error}, ChannelConfig) ->
#{
status => ?status_connecting,
error => Error
error => Error,
config => ChannelConfig
};
channel_status({?status_disconnected, Error}) ->
channel_status({?status_disconnected, Error}, ChannelConfig) ->
#{
status => ?status_disconnected,
error => Error
error => Error,
config => ChannelConfig
};
channel_status(?status_disconnected) ->
channel_status(?status_disconnected, ChannelConfig) ->
#{
status => ?status_disconnected,
error => <<"Disconnected for unknown reason">>
error => <<"Disconnected for unknown reason">>,
config => ChannelConfig
};
channel_status(?status_connecting) ->
channel_status(?status_connecting, ChannelConfig) ->
#{
status => ?status_connecting,
error => <<"Not connected for unknown reason">>
error => <<"Not connected for unknown reason">>,
config => ChannelConfig
};
channel_status(?status_connected) ->
channel_status(?status_connected, ChannelConfig) ->
#{
status => ?status_connected,
error => undefined
error => undefined,
config => ChannelConfig
};
%% Probably not so useful but it is permitted to set an error even when the
%% status is connected
channel_status({?status_connected, Error}) ->
channel_status({?status_connected, Error}, ChannelConfig) ->
#{
status => ?status_connected,
error => Error
error => Error,
config => ChannelConfig
};
channel_status({error, Reason}) ->
channel_status({error, Reason}, ChannelConfig) ->
#{
status => ?status_disconnected,
error => Reason
error => Reason,
config => ChannelConfig
}.
channel_status_is_channel_added(#{
@ -1548,19 +1625,14 @@ channel_status_is_channel_added(#{
channel_status_is_channel_added(_Status) ->
false.
-spec add_channel_status_if_not_exists(data(), channel_id(), resource_state()) -> data().
add_channel_status_if_not_exists(Data, ChannelId, State) ->
-spec add_or_update_channel_status(data(), channel_id(), map(), resource_state()) -> data().
add_or_update_channel_status(Data, ChannelId, ChannelConfig, State) ->
Channels = Data#data.added_channels,
case maps:is_key(ChannelId, Channels) of
true ->
Data;
false ->
ChannelStatus = channel_status({error, resource_not_operational}),
NewChannels = maps:put(ChannelId, ChannelStatus, Channels),
ResStatus = state_to_status(State),
maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev),
Data#data{added_channels = NewChannels}
end.
ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig),
NewChannels = maps:put(ChannelId, ChannelStatus, Channels),
ResStatus = state_to_status(State),
maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev),
Data#data{added_channels = NewChannels}.
state_to_status(?state_stopped) -> ?rm_status_stopped;
state_to_status(?state_connected) -> ?status_connected;

View File

@ -91,13 +91,16 @@ end_per_testcase(_TestCase, _Config) ->
ok.
t_basic_apply_rule_trace_ruleid(Config) ->
basic_apply_rule_test_helper(get_action(Config), ruleid, false).
basic_apply_rule_test_helper(get_action(Config), ruleid, false, text).
t_basic_apply_rule_trace_ruleid_hidden_payload(Config) ->
basic_apply_rule_test_helper(get_action(Config), ruleid, false, hidden).
t_basic_apply_rule_trace_clientid(Config) ->
basic_apply_rule_test_helper(get_action(Config), clientid, false).
basic_apply_rule_test_helper(get_action(Config), clientid, false, text).
t_basic_apply_rule_trace_ruleid_stop_after_render(Config) ->
basic_apply_rule_test_helper(get_action(Config), ruleid, true).
basic_apply_rule_test_helper(get_action(Config), ruleid, true, text).
get_action(Config) ->
case ?config(group_name, Config) of
@ -135,10 +138,10 @@ republish_action() ->
console_print_action() ->
#{<<"function">> => <<"console">>}.
basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) ->
%% Create Rule
RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]),
SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>,
SQL = <<"SELECT payload.id as id, payload as payload FROM \"", RuleTopic/binary, "\"">>,
{ok, #{<<"id">> := RuleId}} =
emqx_bridge_testlib:create_rule_and_action(
Action,
@ -157,12 +160,12 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
clientid ->
ClientId
end,
create_trace(TraceName, TraceType, TraceValue),
create_trace(TraceName, TraceType, TraceValue, PayloadEncode),
%% ===================================
Context = #{
clientid => ClientId,
event_type => message_publish,
payload => <<"{\"msg\": \"hello\"}">>,
payload => <<"{\"msg\": \"my_payload_msg\"}">>,
qos => 1,
topic => RuleTopic,
username => <<"u_emqx">>
@ -179,6 +182,12 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
begin
Bin = read_rule_trace_file(TraceName, TraceType, Now),
io:format("THELOG:~n~s", [Bin]),
case PayloadEncode of
hidden ->
?assertEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>]));
text ->
?assertNotEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>]))
end,
?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"SQL_yielded_result">>])),
case Action of
@ -273,7 +282,7 @@ do_final_log_check(Action, Bin0) when is_binary(Action) ->
do_final_log_check(_, _) ->
ok.
create_trace(TraceName, TraceType, TraceValue) ->
create_trace(TraceName, TraceType, TraceValue, PayloadEncode) ->
Now = erlang:system_time(second) - 10,
Start = Now,
End = Now + 60,
@ -283,7 +292,8 @@ create_trace(TraceName, TraceType, TraceValue) ->
TraceType => TraceValue,
start_at => Start,
end_at => End,
formatter => json
formatter => json,
payload_encode => PayloadEncode
},
{ok, _} = CreateRes = emqx_trace:create(Trace),
emqx_common_test_helpers:on_exit(fun() ->
@ -323,7 +333,7 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
?FUNCTION_NAME,
SQL
),
create_trace(Name, ruleid, RuleID),
create_trace(Name, ruleid, RuleID, text),
Now = erlang:system_time(second) - 10,
%% Stop
ParmsStopAfterRender = apply_rule_parms(true, Name),
@ -588,7 +598,7 @@ do_apply_rule_test_format_action_failed_test(BatchSize, CheckLastTraceEntryFun)
?FUNCTION_NAME,
SQL
),
create_trace(Name, ruleid, RuleID),
create_trace(Name, ruleid, RuleID, text),
Now = erlang:system_time(second) - 10,
%% Stop
ParmsNoStopAfterRender = apply_rule_parms(false, Name),

2
build
View File

@ -145,7 +145,7 @@ make_docs() {
scripts/merge-i18n.escript | jq --sort-keys . > "$desc"
else
# it is not a big deal if we cannot generate the desc
log_red "NOT Generated: $desc"
log_red "NOT Generated: $desc due to jq command missing."
fi
}

View File

@ -0,0 +1,10 @@
Various fixes related to the `durable_sessions` feature:
- Add an option to execute read operations on the leader.
- `drop_generation` operation can be replayed multiple times by the replication layer, but it's not idempotent. This PR adds a workaround that avoids a crash when `drop_generation` doesn't succeed. In the future, however, we want to make `drop_generation` idempotent in a nicer way.
- Wrap storage layer events in a small structure containing the generation ID, to make sure events are handled by the same layout CBM & context that produced them.
- Fix crash when storage event arrives to the dropped generation (now removed `storage_layer:generation_at` function didn't handle the case of dropped generations).
- Implement `format_status` callback for several workers to minimize log spam
- Move the responsibility of `end_of_stream` detection to the layout CBM. Previously storage layer used a heuristic: old generations that return an empty batch won't produce more data. This was, obviously, incorrect: for example, bitfield-LTS layout MAY return empty batch while waiting for safe cutoff time.
- `reference` layout has been enabled in prod build. It could be useful for integration testing.
- Fix incorrect epoch calculation in `bitfield_lts:handle_event` callback that lead to missed safe cutoff time updates, and effectively, subscribers being unable to fetch messages until a fresh batch was published.

View File

@ -0,0 +1 @@
Updates to action configurations would sometimes not take effect without disabling and enabling the action. This means that an action could sometimes run with the old (previous) configuration even though it would look like the action configuration has been updated successfully.

View File

@ -0,0 +1 @@
Attempting to start an action or source whose connector is disabled will no longer attempt to start the connector itself.

View File

@ -90,11 +90,20 @@ wildcard_optimized_epoch_bits.desc:
Time span covered by each epoch grows exponentially with the value of `epoch_bits`:
- `epoch_bits = 1`: epoch time = 1 millisecond
- `epoch_bits = 2`: 2 milliseconds
- `epoch_bits = 1`: epoch time = 2 microseconds
- `epoch_bits = 2`: 4 microseconds
...
- `epoch_bits = 10`: 1024 milliseconds
- `epoch_bits = 13`: ~8 seconds
- `epoch_bits = 20`: ~1s
...~"""
layout_builtin_reference.label: "Reference layout"
layout_builtin_reference.desc:
"""~
A simplistic layout type that stores all messages from all topics in chronological order in a single stream.
Not recommended for production use.~"""
layout_builtin_reference_type.label: "Layout type"
layout_builtin_reference_type.desc: "Reference layout type."
}

View File

@ -0,0 +1,49 @@
node {
name = "emqx@127.0.0.1"
cookie = "emqxsecretcookie"
data_dir = "data"
}
actions {
s3 {
s3direct {
connector = s3local
enable = true
parameters {
acl = private
bucket = direct
content = "${.}"
headers {}
key = "${clientid}/${id}"
}
resource_opts {
health_check_interval = 15s
inflight_window = 100
max_buffer_bytes = 256MB
query_mode = async
request_ttl = 45s
worker_pool_size = 16
}
}
}
}
connectors {
s3 {
s3local {
access_key_id = ACCESS
host = localhost
port = 9000
resource_opts {health_check_interval = 15s, start_timeout = 5s}
secret_access_key = SECRET
transport_options {
connect_timeout = 15s
enable_pipelining = 100
headers {}
ipv6_probe = false
pool_size = 8
pool_type = random
ssl {enable = false, verify = verify_peer}
}
}
}
}

View File

@ -4,17 +4,8 @@ set -euo pipefail
[ "${DEBUG:-0}" -eq 1 ] && set -x
## rebar3 tag 3.19.0-emqx-1 is compiled using latest official OTP-24 image.
## we have to use an otp24-compiled rebar3 because the defination of record #application{}
## in systools.hrl is changed in otp24.
OTP_VSN="${OTP_VSN:-$(./scripts/get-otp-vsn.sh)}"
case ${OTP_VSN} in
23*)
VERSION="3.16.1-emqx-1"
;;
24*)
VERSION="3.18.0-emqx-1"
;;
25*)
VERSION="3.19.0-emqx-9"
;;
@ -22,7 +13,7 @@ case ${OTP_VSN} in
VERSION="3.20.0-emqx-1"
;;
*)
echo "Unsupporetd Erlang/OTP version $OTP_VSN"
echo "Unsupported Erlang/OTP version $OTP_VSN"
exit 1
;;
esac

View File

@ -2,4 +2,4 @@
set -euo pipefail
erl -noshell -eval '{ok, Version} = file:read_file(filename:join([code:root_dir(), "releases", erlang:system_info(otp_release), "OTP_VERSION"])), io:fwrite(Version), halt().'
erl -noshell -eval '{ok, Version} = file:read_file(filename:join([code:root_dir(), "releases", erlang:system_info(otp_release), "OTP_VERSION"])), io:fwrite(Version), halt().'