Merge remote-tracking branch 'origin/release-57' into sync-release-57-20240720-021658
This commit is contained in:
commit
da5ec54612
|
@ -60,7 +60,6 @@
|
||||||
{emqx_node_rebalance_api,1}.
|
{emqx_node_rebalance_api,1}.
|
||||||
{emqx_node_rebalance_api,2}.
|
{emqx_node_rebalance_api,2}.
|
||||||
{emqx_node_rebalance_evacuation,1}.
|
{emqx_node_rebalance_evacuation,1}.
|
||||||
{emqx_node_rebalance_purge,1}.
|
|
||||||
{emqx_node_rebalance_status,1}.
|
{emqx_node_rebalance_status,1}.
|
||||||
{emqx_node_rebalance_status,2}.
|
{emqx_node_rebalance_status,2}.
|
||||||
{emqx_persistent_session_ds,1}.
|
{emqx_persistent_session_ds,1}.
|
||||||
|
|
|
@ -60,7 +60,8 @@
|
||||||
{emqx_statsd, 1},
|
{emqx_statsd, 1},
|
||||||
{emqx_plugin_libs, 1},
|
{emqx_plugin_libs, 1},
|
||||||
{emqx_persistent_session, 1},
|
{emqx_persistent_session, 1},
|
||||||
{emqx_ds, 3}
|
{emqx_ds, 3},
|
||||||
|
{emqx_node_rebalance_purge, 1}
|
||||||
]).
|
]).
|
||||||
%% List of known RPC backend modules:
|
%% List of known RPC backend modules:
|
||||||
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
|
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
|
||||||
|
|
|
@ -62,9 +62,11 @@
|
||||||
node := _,
|
node := _,
|
||||||
payload := _,
|
payload := _,
|
||||||
peername := _,
|
peername := _,
|
||||||
|
pub_props := _,
|
||||||
publish_received_at := _,
|
publish_received_at := _,
|
||||||
qos := _,
|
qos := _,
|
||||||
retain := _,
|
retain := _,
|
||||||
|
timestamp := _,
|
||||||
topic := _,
|
topic := _,
|
||||||
user_property := _,
|
user_property := _,
|
||||||
username := _,
|
username := _,
|
||||||
|
@ -345,6 +347,7 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
|
||||||
undefined
|
undefined
|
||||||
end,
|
end,
|
||||||
Username = maps:get(username, Headers, undefined),
|
Username = maps:get(username, Headers, undefined),
|
||||||
|
Timestamp = erlang:system_time(millisecond),
|
||||||
#{
|
#{
|
||||||
dirty => Dirty,
|
dirty => Dirty,
|
||||||
|
|
||||||
|
@ -355,9 +358,11 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
|
||||||
node => node(),
|
node => node(),
|
||||||
payload => Payload,
|
payload => Payload,
|
||||||
peername => Peername,
|
peername => Peername,
|
||||||
|
pub_props => Props,
|
||||||
publish_received_at => Message#message.timestamp,
|
publish_received_at => Message#message.timestamp,
|
||||||
qos => Message#message.qos,
|
qos => Message#message.qos,
|
||||||
retain => emqx_message:get_flag(retain, Message, false),
|
retain => emqx_message:get_flag(retain, Message, false),
|
||||||
|
timestamp => Timestamp,
|
||||||
topic => Message#message.topic,
|
topic => Message#message.topic,
|
||||||
user_property => UserProperties,
|
user_property => UserProperties,
|
||||||
username => Username
|
username => Username
|
||||||
|
@ -462,6 +467,17 @@ decode(
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{error, TraceFailureContext};
|
{error, TraceFailureContext};
|
||||||
|
throw:{schema_decode_error, ExtraContext} ->
|
||||||
|
TraceFailureContext = #trace_failure_context{
|
||||||
|
transformation = Transformation,
|
||||||
|
tag = "payload_decode_error",
|
||||||
|
context = ExtraContext#{
|
||||||
|
decoder => protobuf,
|
||||||
|
schema_name => SerdeName,
|
||||||
|
message_type => MessageType
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{error, TraceFailureContext};
|
||||||
Class:Error:Stacktrace ->
|
Class:Error:Stacktrace ->
|
||||||
TraceFailureContext = #trace_failure_context{
|
TraceFailureContext = #trace_failure_context{
|
||||||
transformation = Transformation,
|
transformation = Transformation,
|
||||||
|
|
|
@ -108,8 +108,7 @@ fields(transformation) ->
|
||||||
hoconsc:array(ref(operation)),
|
hoconsc:array(ref(operation)),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("operation"),
|
desc => ?DESC("operation"),
|
||||||
required => true,
|
default => []
|
||||||
validator => fun validate_operations/1
|
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
|
@ -253,11 +252,6 @@ validate_unique_topics(Topics) ->
|
||||||
{error, Msg}
|
{error, Msg}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
validate_operations([]) ->
|
|
||||||
{error, <<"at least one operation must be defined">>};
|
|
||||||
validate_operations([_ | _]) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
compile_variform(Expression, #{make_serializable := true}) ->
|
compile_variform(Expression, #{make_serializable := true}) ->
|
||||||
case is_binary(Expression) of
|
case is_binary(Expression) of
|
||||||
true ->
|
true ->
|
||||||
|
|
|
@ -504,7 +504,7 @@ assert_monitor_metrics() ->
|
||||||
receive
|
receive
|
||||||
PATTERN = ____Msg0 -> ____Msg0
|
PATTERN = ____Msg0 -> ____Msg0
|
||||||
after TIMEOUT ->
|
after TIMEOUT ->
|
||||||
error({message_not_received, ?LINE})
|
error({message_not_received, {line, ?LINE}})
|
||||||
end
|
end
|
||||||
end)()
|
end)()
|
||||||
).
|
).
|
||||||
|
@ -608,6 +608,8 @@ t_smoke_test(_Config) ->
|
||||||
%% * peername
|
%% * peername
|
||||||
%% * publish_received_at
|
%% * publish_received_at
|
||||||
%% * username
|
%% * username
|
||||||
|
%% * timestamp
|
||||||
|
%% * pub_props (and specific fields within containing hyphens)
|
||||||
t_smoke_test_2(_Config) ->
|
t_smoke_test_2(_Config) ->
|
||||||
Name1 = <<"foo">>,
|
Name1 = <<"foo">>,
|
||||||
Operations = [
|
Operations = [
|
||||||
|
@ -617,14 +619,22 @@ t_smoke_test_2(_Config) ->
|
||||||
operation(<<"payload.peername">>, <<"peername">>),
|
operation(<<"payload.peername">>, <<"peername">>),
|
||||||
operation(<<"payload.publish_received_at">>, <<"publish_received_at">>),
|
operation(<<"payload.publish_received_at">>, <<"publish_received_at">>),
|
||||||
operation(<<"payload.username">>, <<"username">>),
|
operation(<<"payload.username">>, <<"username">>),
|
||||||
operation(<<"payload.flags">>, <<"flags">>)
|
operation(<<"payload.flags">>, <<"flags">>),
|
||||||
|
operation(<<"payload.timestamp">>, <<"timestamp">>),
|
||||||
|
operation(<<"payload.pub_props">>, <<"pub_props">>),
|
||||||
|
operation(<<"payload.content_type">>, <<"pub_props.Content-Type">>)
|
||||||
],
|
],
|
||||||
Transformation1 = transformation(Name1, Operations),
|
Transformation1 = transformation(Name1, Operations),
|
||||||
{201, _} = insert(Transformation1),
|
{201, _} = insert(Transformation1),
|
||||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
C1 = connect(ClientId),
|
C1 = connect(ClientId),
|
||||||
{ok, _, [_]} = emqtt:subscribe(C1, <<"t/#">>, [{qos, 2}]),
|
{ok, _, [_]} = emqtt:subscribe(C1, <<"t/#">>, [{qos, 2}]),
|
||||||
ok = publish(C1, <<"t/1">>, #{}),
|
ok = publish(C1, <<"t/1">>, #{}, _QoS = 0, #{
|
||||||
|
props => #{
|
||||||
|
'Content-Type' => <<"application/json">>,
|
||||||
|
'User-Property' => [{<<"a">>, <<"b">>}]
|
||||||
|
}
|
||||||
|
}),
|
||||||
{publish, #{payload := Payload0}} = ?assertReceiveReturn({publish, _}, 1_000),
|
{publish, #{payload := Payload0}} = ?assertReceiveReturn({publish, _}, 1_000),
|
||||||
NodeBin = atom_to_binary(node()),
|
NodeBin = atom_to_binary(node()),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
|
@ -635,7 +645,13 @@ t_smoke_test_2(_Config) ->
|
||||||
<<"peername">> := <<"127.0.0.1:", _/binary>>,
|
<<"peername">> := <<"127.0.0.1:", _/binary>>,
|
||||||
<<"publish_received_at">> := PRAt,
|
<<"publish_received_at">> := PRAt,
|
||||||
<<"username">> := <<"undefined">>,
|
<<"username">> := <<"undefined">>,
|
||||||
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false}
|
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false},
|
||||||
|
<<"timestamp">> := _,
|
||||||
|
<<"pub_props">> := #{
|
||||||
|
<<"Content-Type">> := <<"application/json">>,
|
||||||
|
<<"User-Property">> := #{<<"a">> := <<"b">>}
|
||||||
|
},
|
||||||
|
<<"content_type">> := <<"application/json">>
|
||||||
} when is_integer(PRAt),
|
} when is_integer(PRAt),
|
||||||
emqx_utils_json:decode(Payload0, [return_maps])
|
emqx_utils_json:decode(Payload0, [return_maps])
|
||||||
),
|
),
|
||||||
|
@ -644,7 +660,12 @@ t_smoke_test_2(_Config) ->
|
||||||
Username = <<"myusername">>,
|
Username = <<"myusername">>,
|
||||||
C2 = connect(ClientId, _IsPersistent = false, #{start_props => #{username => Username}}),
|
C2 = connect(ClientId, _IsPersistent = false, #{start_props => #{username => Username}}),
|
||||||
{ok, _, [_]} = emqtt:subscribe(C2, <<"t/#">>, [{qos, 2}]),
|
{ok, _, [_]} = emqtt:subscribe(C2, <<"t/#">>, [{qos, 2}]),
|
||||||
ok = publish(C2, <<"t/1">>, #{}),
|
ok = publish(C2, <<"t/1">>, #{}, _QoS = 0, #{
|
||||||
|
props => #{
|
||||||
|
'Content-Type' => <<"application/json">>,
|
||||||
|
'User-Property' => [{<<"a">>, <<"b">>}]
|
||||||
|
}
|
||||||
|
}),
|
||||||
{publish, #{payload := Payload1}} = ?assertReceiveReturn({publish, _}, 1_000),
|
{publish, #{payload := Payload1}} = ?assertReceiveReturn({publish, _}, 1_000),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
|
@ -654,7 +675,13 @@ t_smoke_test_2(_Config) ->
|
||||||
<<"peername">> := <<"127.0.0.1:", _/binary>>,
|
<<"peername">> := <<"127.0.0.1:", _/binary>>,
|
||||||
<<"publish_received_at">> := PRAt,
|
<<"publish_received_at">> := PRAt,
|
||||||
<<"username">> := Username,
|
<<"username">> := Username,
|
||||||
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false}
|
<<"flags">> := #{<<"dup">> := false, <<"retain">> := false},
|
||||||
|
<<"timestamp">> := _,
|
||||||
|
<<"pub_props">> := #{
|
||||||
|
<<"Content-Type">> := <<"application/json">>,
|
||||||
|
<<"User-Property">> := #{<<"a">> := <<"b">>}
|
||||||
|
},
|
||||||
|
<<"content_type">> := <<"application/json">>
|
||||||
} when is_integer(PRAt),
|
} when is_integer(PRAt),
|
||||||
emqx_utils_json:decode(Payload1, [return_maps])
|
emqx_utils_json:decode(Payload1, [return_maps])
|
||||||
),
|
),
|
||||||
|
@ -1367,6 +1394,94 @@ t_protobuf(_Config) ->
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% Checks what happens if a wrong transformation chain is used. In this case, the second
|
||||||
|
%% transformation attempts to protobuf-decode a message that was already decoded but not
|
||||||
|
%% re-encoded by the first transformation.
|
||||||
|
t_protobuf_bad_chain(_Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
SerdeName = <<"myserde">>,
|
||||||
|
MessageType = <<"Person">>,
|
||||||
|
protobuf_create_serde(SerdeName),
|
||||||
|
|
||||||
|
Name1 = <<"foo">>,
|
||||||
|
PayloadSerde = #{
|
||||||
|
<<"type">> => <<"protobuf">>,
|
||||||
|
<<"schema">> => SerdeName,
|
||||||
|
<<"message_type">> => MessageType
|
||||||
|
},
|
||||||
|
NoneSerde = #{<<"type">> => <<"none">>},
|
||||||
|
JSONSerde = #{<<"type">> => <<"json">>},
|
||||||
|
|
||||||
|
Transformation1 = transformation(Name1, _Ops1 = [], #{
|
||||||
|
<<"payload_decoder">> => PayloadSerde,
|
||||||
|
<<"payload_encoder">> => NoneSerde
|
||||||
|
}),
|
||||||
|
{201, _} = insert(Transformation1),
|
||||||
|
|
||||||
|
%% WRONG: after the first transformation, payload is already decoded, so we
|
||||||
|
%% shouldn't use protobuf again.
|
||||||
|
Name2 = <<"bar">>,
|
||||||
|
Transformation2A = transformation(Name2, [], #{
|
||||||
|
<<"payload_decoder">> => PayloadSerde,
|
||||||
|
<<"payload_encoder">> => JSONSerde
|
||||||
|
}),
|
||||||
|
{201, _} = insert(Transformation2A),
|
||||||
|
|
||||||
|
C = connect(<<"c1">>),
|
||||||
|
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
|
||||||
|
|
||||||
|
[Payload | _] = protobuf_valid_payloads(SerdeName, MessageType),
|
||||||
|
ok = publish(C, <<"t/1">>, {raw, Payload}),
|
||||||
|
?assertNotReceive({publish, _}),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
ct:pal("trace:\n ~p", [Trace]),
|
||||||
|
?assertMatch(
|
||||||
|
[],
|
||||||
|
[
|
||||||
|
E
|
||||||
|
|| #{
|
||||||
|
?snk_kind := message_transformation_failed,
|
||||||
|
message := "payload_decode_schema_failure",
|
||||||
|
reason := function_clause
|
||||||
|
} = E <- Trace
|
||||||
|
]
|
||||||
|
),
|
||||||
|
%% No unexpected crashes
|
||||||
|
?assertMatch(
|
||||||
|
[],
|
||||||
|
[
|
||||||
|
E
|
||||||
|
|| #{
|
||||||
|
?snk_kind := message_transformation_failed,
|
||||||
|
stacktrace := _
|
||||||
|
} = E <- Trace
|
||||||
|
]
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
explain :=
|
||||||
|
<<"Attempted to schema decode an already decoded message.", _/binary>>
|
||||||
|
}
|
||||||
|
| _
|
||||||
|
],
|
||||||
|
[
|
||||||
|
E
|
||||||
|
|| #{
|
||||||
|
?snk_kind := message_transformation_failed,
|
||||||
|
message := "payload_decode_error"
|
||||||
|
} = E <- Trace
|
||||||
|
]
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
%% Tests that restoring a backup config works.
|
%% Tests that restoring a backup config works.
|
||||||
%% * Existing transformations (identified by `name') are left untouched.
|
%% * Existing transformations (identified by `name') are left untouched.
|
||||||
%% * No transformations are removed.
|
%% * No transformations are removed.
|
||||||
|
|
|
@ -114,14 +114,9 @@ schema_test_() ->
|
||||||
transformation(<<"foo">>, [dummy_operation()])
|
transformation(<<"foo">>, [dummy_operation()])
|
||||||
])
|
])
|
||||||
)},
|
)},
|
||||||
{"operations must be non-empty",
|
{"operations may be empty",
|
||||||
?_assertThrow(
|
?_assertMatch(
|
||||||
{_Schema, [
|
[#{<<"operations">> := []}],
|
||||||
#{
|
|
||||||
reason := <<"at least one operation must be defined">>,
|
|
||||||
kind := validation_error
|
|
||||||
}
|
|
||||||
]},
|
|
||||||
parse_and_check([
|
parse_and_check([
|
||||||
transformation(
|
transformation(
|
||||||
<<"foo">>,
|
<<"foo">>,
|
||||||
|
|
|
@ -1,29 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_node_rebalance_purge_proto_v1).
|
|
||||||
|
|
||||||
-behaviour(emqx_bpapi).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
introduced_in/0,
|
|
||||||
|
|
||||||
start/2,
|
|
||||||
stop/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-include_lib("emqx/include/bpapi.hrl").
|
|
||||||
|
|
||||||
introduced_in() ->
|
|
||||||
"5.2.1".
|
|
||||||
|
|
||||||
-spec start([node()], emqx_node_rebalance_purge:start_opts()) ->
|
|
||||||
emqx_rpc:erpc_multicall(ok | {error, emqx_node_rebalance_purge:start_error()}).
|
|
||||||
start(Nodes, Opts) ->
|
|
||||||
erpc:multicall(Nodes, emqx_node_rebalance_purge, start, [Opts]).
|
|
||||||
|
|
||||||
-spec stop([node()]) ->
|
|
||||||
emqx_rpc:erpc_multicall(ok | {error, emqx_node_rebalance_purge:stop_error()}).
|
|
||||||
stop(Nodes) ->
|
|
||||||
erpc:multicall(Nodes, emqx_node_rebalance_purge, stop, []).
|
|
|
@ -326,6 +326,13 @@ fields("ctx_schema_validation_failed") ->
|
||||||
{"event_type", event_type_sc(Event)},
|
{"event_type", event_type_sc(Event)},
|
||||||
{"validation", sc(binary(), #{desc => ?DESC("event_validation")})}
|
{"validation", sc(binary(), #{desc => ?DESC("event_validation")})}
|
||||||
| msg_event_common_fields()
|
| msg_event_common_fields()
|
||||||
|
];
|
||||||
|
fields("ctx_message_transformation_failed") ->
|
||||||
|
Event = 'message.transformation_failed',
|
||||||
|
[
|
||||||
|
{"event_type", event_type_sc(Event)},
|
||||||
|
{"transformation", sc(binary(), #{desc => ?DESC("event_transformation")})}
|
||||||
|
| msg_event_common_fields()
|
||||||
].
|
].
|
||||||
|
|
||||||
rule_input_message_context() ->
|
rule_input_message_context() ->
|
||||||
|
@ -345,7 +352,8 @@ rule_input_message_context() ->
|
||||||
ref("ctx_check_authn_complete"),
|
ref("ctx_check_authn_complete"),
|
||||||
ref("ctx_bridge_mqtt"),
|
ref("ctx_bridge_mqtt"),
|
||||||
ref("ctx_delivery_dropped"),
|
ref("ctx_delivery_dropped"),
|
||||||
ref("ctx_schema_validation_failed")
|
ref("ctx_schema_validation_failed"),
|
||||||
|
ref("ctx_message_transformation_failed")
|
||||||
]),
|
]),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("test_context"),
|
desc => ?DESC("test_context"),
|
||||||
|
|
|
@ -206,6 +206,8 @@ is_test_runtime_env() ->
|
||||||
%% is different from `topic'.
|
%% is different from `topic'.
|
||||||
get_in_topic(#{event_type := schema_validation_failed}) ->
|
get_in_topic(#{event_type := schema_validation_failed}) ->
|
||||||
<<"$events/schema_validation_failed">>;
|
<<"$events/schema_validation_failed">>;
|
||||||
|
get_in_topic(#{event_type := message_transformation_failed}) ->
|
||||||
|
<<"$events/message_transformation_failed">>;
|
||||||
get_in_topic(Context) ->
|
get_in_topic(Context) ->
|
||||||
case maps:find(event_topic, Context) of
|
case maps:find(event_topic, Context) of
|
||||||
{ok, EventTopic} ->
|
{ok, EventTopic} ->
|
||||||
|
|
|
@ -257,6 +257,21 @@ t_ctx_schema_validation_failed(_) ->
|
||||||
Expected = check_result([validation], [], Context),
|
Expected = check_result([validation], [], Context),
|
||||||
do_test(SQL, Context, Expected).
|
do_test(SQL, Context, Expected).
|
||||||
|
|
||||||
|
t_ctx_message_transformation_failed(_) ->
|
||||||
|
SQL =
|
||||||
|
<<"SELECT transformation FROM \"$events/message_transformation_failed\"">>,
|
||||||
|
Context = #{
|
||||||
|
<<"clientid">> => <<"c_emqx">>,
|
||||||
|
<<"event_type">> => <<"message_transformation_failed">>,
|
||||||
|
<<"payload">> => <<"{\"msg\": \"hello\"}">>,
|
||||||
|
<<"qos">> => 1,
|
||||||
|
<<"topic">> => <<"t/a">>,
|
||||||
|
<<"username">> => <<"u_emqx">>,
|
||||||
|
<<"transformation">> => <<"m">>
|
||||||
|
},
|
||||||
|
Expected = check_result([transformation], [], Context),
|
||||||
|
do_test(SQL, Context, Expected).
|
||||||
|
|
||||||
t_mongo_date_function_should_return_string_in_test_env(_) ->
|
t_mongo_date_function_should_return_string_in_test_env(_) ->
|
||||||
SQL =
|
SQL =
|
||||||
<<"SELECT mongo_date() as mongo_date FROM \"$events/client_check_authz_complete\"">>,
|
<<"SELECT mongo_date() as mongo_date FROM \"$events/client_check_authz_complete\"">>,
|
||||||
|
|
|
@ -90,21 +90,7 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) ->
|
||||||
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
|
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
|
||||||
);
|
);
|
||||||
handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
|
handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
|
||||||
try
|
decode(SchemaId, Data, MoreArgs);
|
||||||
decode(SchemaId, Data, MoreArgs)
|
|
||||||
catch
|
|
||||||
error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} ->
|
|
||||||
throw(
|
|
||||||
{schema_decode_error, #{
|
|
||||||
error_type => decoding_failure,
|
|
||||||
schema_id => SchemaId,
|
|
||||||
data => Data,
|
|
||||||
more_args => MoreArgs,
|
|
||||||
explain =>
|
|
||||||
<<"The given data could not be decoded. Please check the input data and the schema.">>
|
|
||||||
}}
|
|
||||||
)
|
|
||||||
end;
|
|
||||||
handle_rule_function(schema_decode, Args) ->
|
handle_rule_function(schema_decode, Args) ->
|
||||||
error({args_count_error, {schema_decode, Args}});
|
error({args_count_error, {schema_decode, Args}});
|
||||||
handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
|
handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
|
||||||
|
@ -162,7 +148,17 @@ encode(SerdeName, Data, VarArgs) when is_list(VarArgs) ->
|
||||||
with_serde(Name, F) ->
|
with_serde(Name, F) ->
|
||||||
case emqx_schema_registry:get_serde(Name) of
|
case emqx_schema_registry:get_serde(Name) of
|
||||||
{ok, Serde} ->
|
{ok, Serde} ->
|
||||||
F(Serde);
|
Meta =
|
||||||
|
case logger:get_process_metadata() of
|
||||||
|
undefined -> #{};
|
||||||
|
Meta0 -> Meta0
|
||||||
|
end,
|
||||||
|
logger:update_process_metadata(#{schema_name => Name}),
|
||||||
|
try
|
||||||
|
F(Serde)
|
||||||
|
after
|
||||||
|
logger:set_process_metadata(Meta)
|
||||||
|
end;
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
error({serde_not_found, Name})
|
error({serde_not_found, Name})
|
||||||
end.
|
end.
|
||||||
|
@ -199,10 +195,39 @@ make_serde(json, Name, Source) ->
|
||||||
eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
|
eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
|
||||||
Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
|
Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
|
||||||
avro_binary_decoder:decode(Data, Name, Store, Opts);
|
avro_binary_decoder:decode(Data, Name, Store, Opts);
|
||||||
eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) ->
|
eval_decode(#serde{type = protobuf}, [#{} = DecodedData, MessageType]) ->
|
||||||
MessageName = binary_to_existing_atom(MessageName0, utf8),
|
%% Already decoded, so it's an user error.
|
||||||
Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]),
|
throw(
|
||||||
emqx_utils_maps:binary_key_map(Decoded);
|
{schema_decode_error, #{
|
||||||
|
error_type => decoding_failure,
|
||||||
|
data => DecodedData,
|
||||||
|
message_type => MessageType,
|
||||||
|
explain =>
|
||||||
|
<<
|
||||||
|
"Attempted to schema decode an already decoded message."
|
||||||
|
" Check your rules or transformation pipeline."
|
||||||
|
>>
|
||||||
|
}}
|
||||||
|
);
|
||||||
|
eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageType0]) ->
|
||||||
|
MessageType = binary_to_existing_atom(MessageType0, utf8),
|
||||||
|
try
|
||||||
|
Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageType]),
|
||||||
|
emqx_utils_maps:binary_key_map(Decoded)
|
||||||
|
catch
|
||||||
|
error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} ->
|
||||||
|
#{schema_name := SchemaName} = logger:get_process_metadata(),
|
||||||
|
throw(
|
||||||
|
{schema_decode_error, #{
|
||||||
|
error_type => decoding_failure,
|
||||||
|
data => EncodedData,
|
||||||
|
message_type => MessageType,
|
||||||
|
schema_name => SchemaName,
|
||||||
|
explain =>
|
||||||
|
<<"The given data could not be decoded. Please check the input data and the schema.">>
|
||||||
|
}}
|
||||||
|
)
|
||||||
|
end;
|
||||||
eval_decode(#serde{type = json, name = Name}, [Data]) ->
|
eval_decode(#serde{type = json, name = Name}, [Data]) ->
|
||||||
true = is_binary(Data),
|
true = is_binary(Data),
|
||||||
Term = json_decode(Data),
|
Term = json_decode(Data),
|
||||||
|
|
|
@ -555,8 +555,8 @@ t_decode_fail(_Config) ->
|
||||||
data := <<"ss">>,
|
data := <<"ss">>,
|
||||||
error_type := decoding_failure,
|
error_type := decoding_failure,
|
||||||
explain := _,
|
explain := _,
|
||||||
more_args := [<<"Person">>],
|
message_type := 'Person',
|
||||||
schema_id := <<"my_serde">>
|
schema_name := <<"my_serde">>
|
||||||
}},
|
}},
|
||||||
emqx_rule_funcs:schema_decode(<<"my_serde">>, Payload, <<"Person">>)
|
emqx_rule_funcs:schema_decode(<<"my_serde">>, Payload, <<"Person">>)
|
||||||
),
|
),
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
Definitions.
|
Definitions.
|
||||||
%% Define regular expressions for tokens
|
%% Define regular expressions for tokens
|
||||||
IDENTIFIER = [a-zA-Z][a-zA-Z0-9_.]*
|
IDENTIFIER = [a-zA-Z][-a-zA-Z0-9_.]*
|
||||||
SQ_STRING = \'[^\']*\'
|
SQ_STRING = \'[^\']*\'
|
||||||
DQ_STRING = \"[^\"]*\"
|
DQ_STRING = \"[^\"]*\"
|
||||||
INTEGER = [+-]?[0-9]+
|
INTEGER = [+-]?[0-9]+
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
|
|
||||||
-define(SYNTAX_ERROR, {error, "syntax error before:" ++ _}).
|
-define(SYNTAX_ERROR, {error, "syntax error before:" ++ _}).
|
||||||
|
|
||||||
redner_test_() ->
|
render_test_() ->
|
||||||
[
|
[
|
||||||
{"direct var reference", fun() -> ?assertEqual({ok, <<"1">>}, render("a", #{a => 1})) end},
|
{"direct var reference", fun() -> ?assertEqual({ok, <<"1">>}, render("a", #{a => 1})) end},
|
||||||
{"concat strings", fun() ->
|
{"concat strings", fun() ->
|
||||||
|
@ -32,6 +32,15 @@ redner_test_() ->
|
||||||
{"concat empty string", fun() ->
|
{"concat empty string", fun() ->
|
||||||
?assertEqual({ok, <<"">>}, render("concat([''])", #{}))
|
?assertEqual({ok, <<"">>}, render("concat([''])", #{}))
|
||||||
end},
|
end},
|
||||||
|
{"identifier with hyphen", fun() ->
|
||||||
|
?assertEqual(
|
||||||
|
{ok, <<"10">>},
|
||||||
|
render(
|
||||||
|
"pub_props.Message-Expiry-Interval",
|
||||||
|
#{pub_props => #{'Message-Expiry-Interval' => 10}}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
end},
|
||||||
{"tokens 1st", fun() ->
|
{"tokens 1st", fun() ->
|
||||||
?assertEqual({ok, <<"a">>}, render("nth(1,tokens(var, ','))", #{var => <<"a,b">>}))
|
?assertEqual({ok, <<"a">>}, render("nth(1,tokens(var, ','))", #{var => <<"a,b">>}))
|
||||||
end},
|
end},
|
||||||
|
|
|
@ -13,7 +13,9 @@ connect_timeout.label:
|
||||||
"""Connect Timeout"""
|
"""Connect Timeout"""
|
||||||
|
|
||||||
enable_pipelining.desc:
|
enable_pipelining.desc:
|
||||||
"""A positive integer. Whether to send HTTP requests continuously, when set to 1, it means that after each HTTP request is sent, you need to wait for the server to return and then continue to send the next request."""
|
"""The maximum number of HTTP requests that can be sent before an HTTP response is received.
|
||||||
|
|
||||||
|
Setting this to 1 is equivalent to turning off HTTP pipelining, and the EMQX must receive a response to the previous HTTP request before sending the next HTTP request."""
|
||||||
|
|
||||||
enable_pipelining.label:
|
enable_pipelining.label:
|
||||||
"""HTTP Pipelining"""
|
"""HTTP Pipelining"""
|
||||||
|
|
|
@ -72,7 +72,8 @@ desc_name.label:
|
||||||
"""Action Name"""
|
"""Action Name"""
|
||||||
|
|
||||||
config_parameters_timestamp.desc:
|
config_parameters_timestamp.desc:
|
||||||
"""Timestamp. Placeholders in format of ${var} is supported, the final value can be:</br>
|
"""Timestamp. Placeholders in format of ${var} is supported, the final value can be:
|
||||||
|
|
||||||
- now: use the `now_ms` which is contained in the payload as timestamp
|
- now: use the `now_ms` which is contained in the payload as timestamp
|
||||||
- now_ms: same as above
|
- now_ms: same as above
|
||||||
- now_us: use the `now_us` which is contained in the payload as timestamp
|
- now_us: use the `now_us` which is contained in the payload as timestamp
|
||||||
|
@ -89,8 +90,9 @@ config_parameters_measurement.label:
|
||||||
"""Measurement"""
|
"""Measurement"""
|
||||||
|
|
||||||
config_parameters_data_type.desc:
|
config_parameters_data_type.desc:
|
||||||
"""Data Type, an enumerated or a string. </br>
|
"""Data Type, an enumerated or a string.
|
||||||
For string placeholders in format of ${var} is supported, the final value can be:</br>
|
For string placeholders in format of ${var} is supported, the final value can be:
|
||||||
|
|
||||||
- TEXT
|
- TEXT
|
||||||
- BOOLEAN
|
- BOOLEAN
|
||||||
- INT32
|
- INT32
|
||||||
|
|
|
@ -166,8 +166,7 @@ desc_audit_log_handler.label:
|
||||||
"""Audit Log Handler"""
|
"""Audit Log Handler"""
|
||||||
|
|
||||||
rpc_socket_keepalive_count.desc:
|
rpc_socket_keepalive_count.desc:
|
||||||
"""How many times the keepalive probe message can fail to receive a reply
|
"""Corresponds to the `TCP_KEEPCNT` socket option. The maximum number of TCP keep-alive probes to send before giving up and killing the connection if no response is obtained from the other end."""
|
||||||
until the RPC connection is considered lost."""
|
|
||||||
|
|
||||||
rpc_socket_keepalive_count.label:
|
rpc_socket_keepalive_count.label:
|
||||||
"""RPC Socket Keepalive Count"""
|
"""RPC Socket Keepalive Count"""
|
||||||
|
@ -572,7 +571,7 @@ authorization.label:
|
||||||
"""Authorization"""
|
"""Authorization"""
|
||||||
|
|
||||||
rpc_socket_keepalive_idle.desc:
|
rpc_socket_keepalive_idle.desc:
|
||||||
"""How long the connections between the brokers should remain open after the last message is sent."""
|
"""Corresponds to the `TCP_KEEPIDLE` socket option. The time (in seconds) the connection needs to remain idle before TCP starts sending keepalive probes."""
|
||||||
|
|
||||||
rpc_socket_keepalive_idle.label:
|
rpc_socket_keepalive_idle.label:
|
||||||
"""RPC Socket Keepalive Idle"""
|
"""RPC Socket Keepalive Idle"""
|
||||||
|
@ -688,7 +687,7 @@ node_crash_dump_file.label:
|
||||||
"""Crash Dump File"""
|
"""Crash Dump File"""
|
||||||
|
|
||||||
rpc_socket_keepalive_interval.desc:
|
rpc_socket_keepalive_interval.desc:
|
||||||
"""The interval between keepalive messages."""
|
"""Corresponds to the `TCP_KEEPINTVL` socket option. The time (in seconds) between individual keepalive probes."""
|
||||||
|
|
||||||
rpc_socket_keepalive_interval.label:
|
rpc_socket_keepalive_interval.label:
|
||||||
"""RPC Socket Keepalive Interval"""
|
"""RPC Socket Keepalive Interval"""
|
||||||
|
|
|
@ -1,10 +1,14 @@
|
||||||
emqx_conf_schema_types {
|
emqx_conf_schema_types {
|
||||||
|
|
||||||
duration.desc:
|
duration.desc:
|
||||||
"""A string that represents a time duration, for example: <code>10s</code>, <code>2.5m</code>, <code>1h30m</code>, <code>1W2D</code>, or <code>2345ms</code>, which is the smallest unit. When precision is specified, finer portions of the duration may be ignored: writing <code>1200ms</code> for <code>Duration(s)</code> is equivalent to writing <code>1s</code>. The unit part is case-insensitive."""
|
"""A string that represents a time duration, for example: <code>10s</code>, <code>2.5m</code>, <code>1h30m</code>, <code>1W2D</code>, or <code>2345ms</code>, which is the smallest unit. Each configuration item has its own minimum precision. The part of the setting value that exceeds the precision will be ignored.
|
||||||
|
|
||||||
|
For example, if a configuration item of type <code>Duration(s)</code> is set to <code>1200ms</code>, the final effective value will be <code>1s</code> instead of <code>1.2s</code>.
|
||||||
|
|
||||||
|
`Duration` is equivalent to `Duration(ms)`. The unit part is case-insensitive."""
|
||||||
|
|
||||||
bytesize.desc:
|
bytesize.desc:
|
||||||
"""A string that represents a number of bytes, for example: <code>10B</code>, <code>640kb</code>, <code>4MB</code>, <code>1GB</code>. Units are interpreted as powers of 1024, and the unit part is case-insensitive."""
|
"""A string that represents a number of bytes, for example: <code>10B</code>, <code>640kb</code>, <code>4MB</code>, <code>1GB</code>. Units are binary standardized, i.e., 1MB equals 1024KB. units are not case sensitive, i.e., 1kb equals 1KB."""
|
||||||
|
|
||||||
secret.desc:
|
secret.desc:
|
||||||
"""A string holding some sensitive information, such as a password. When secret starts with <code>file://</code>, the rest of the string is interpreted as a path to a file containing the secret itself: whole content of the file except any trailing whitespace characters is considered a secret value. Note: when clustered, all EMQX nodes should have the same file present before using <code>file://</code> secrets."""
|
"""A string holding some sensitive information, such as a password. When secret starts with <code>file://</code>, the rest of the string is interpreted as a path to a file containing the secret itself: whole content of the file except any trailing whitespace characters is considered a secret value. Note: when clustered, all EMQX nodes should have the same file present before using <code>file://</code> secrets."""
|
||||||
|
|
|
@ -104,7 +104,7 @@ listeners.label:
|
||||||
"""Listeners"""
|
"""Listeners"""
|
||||||
|
|
||||||
max_connections.desc:
|
max_connections.desc:
|
||||||
"""Maximum number of simultaneous connections."""
|
"""The maximum number of concurrent connections allowed by the listener."""
|
||||||
|
|
||||||
max_connections.label:
|
max_connections.label:
|
||||||
"""Maximum connections"""
|
"""Maximum connections"""
|
||||||
|
|
|
@ -31,7 +31,7 @@ gateway_listeners.desc:
|
||||||
"""The Gateway listeners overview"""
|
"""The Gateway listeners overview"""
|
||||||
|
|
||||||
gateway_max_connections.desc:
|
gateway_max_connections.desc:
|
||||||
"""The Gateway allowed maximum connections/clients"""
|
"""The maximum number of concurrent connections allowed by the gateway."""
|
||||||
|
|
||||||
gateway_name.desc:
|
gateway_name.desc:
|
||||||
"""Gateway Name"""
|
"""Gateway Name"""
|
||||||
|
|
|
@ -34,8 +34,13 @@ gateway_common_idle_timeout.desc:
|
||||||
2. A running client process that does not receive any client requests after this time will go into hibernation to save resources."""
|
2. A running client process that does not receive any client requests after this time will go into hibernation to save resources."""
|
||||||
|
|
||||||
gateway_common_listener_access_rules.desc:
|
gateway_common_listener_access_rules.desc:
|
||||||
"""The access control rules for this listener.
|
"""An access rule list consisting of string rules to restrict or allow access from some addresses.
|
||||||
See: https://github.com/emqtt/esockd#allowdeny"""
|
The rules that appear earlier in the list are matched first.
|
||||||
|
The format is `allow | deny <address> | <CIDR> | all`.
|
||||||
|
|
||||||
|
For example:
|
||||||
|
|
||||||
|
`[\"deny 192.168.1.1\", \"allow 192.168.1.0/24\", \"deny, all\"]`"""
|
||||||
|
|
||||||
gateway_common_listener_bind.desc:
|
gateway_common_listener_bind.desc:
|
||||||
"""The IP address and port that the listener will bind."""
|
"""The IP address and port that the listener will bind."""
|
||||||
|
@ -51,7 +56,7 @@ gateway_common_listener_max_conn_rate.desc:
|
||||||
"""Maximum connections per second."""
|
"""Maximum connections per second."""
|
||||||
|
|
||||||
gateway_common_listener_max_connections.desc:
|
gateway_common_listener_max_connections.desc:
|
||||||
"""Maximum number of concurrent connections."""
|
"""The maximum number of concurrent connections allowed by the listener."""
|
||||||
|
|
||||||
gateway_mountpoint.desc:
|
gateway_mountpoint.desc:
|
||||||
"""When publishing or subscribing, prefix all topics with a mountpoint string.
|
"""When publishing or subscribing, prefix all topics with a mountpoint string.
|
||||||
|
@ -77,8 +82,11 @@ tcp_listener_acceptors.desc:
|
||||||
"""Size of the acceptor pool."""
|
"""Size of the acceptor pool."""
|
||||||
|
|
||||||
tcp_listener_proxy_protocol.desc:
|
tcp_listener_proxy_protocol.desc:
|
||||||
"""Enable the Proxy Protocol V1/2 if the EMQX cluster is deployed behind HAProxy or Nginx.
|
"""If a reverse proxy is deployed for EMQX, and the PROXY protocol is enabled at the proxy to pass the client's real IP,
|
||||||
See: https://www.haproxy.com/blog/haproxy/proxy-protocol/"""
|
this option needs to be turned on so that EMQX can extract the client's real IP from the PROXY protocol header.
|
||||||
|
EMQX will automatically detect the version of the PROXY protocol and support V1 and V2.
|
||||||
|
|
||||||
|
For a detailed description of the PROXY protocol, please refer to: https://www.haproxy.com/blog/haproxy/proxy-protocol/"""
|
||||||
|
|
||||||
tcp_listener_proxy_protocol_timeout.desc:
|
tcp_listener_proxy_protocol_timeout.desc:
|
||||||
"""Timeout for proxy protocol.
|
"""Timeout for proxy protocol.
|
||||||
|
@ -136,10 +144,10 @@ fields_ws_opts_compress.label:
|
||||||
"""Ws compress"""
|
"""Ws compress"""
|
||||||
|
|
||||||
fields_ws_opts_idle_timeout.desc:
|
fields_ws_opts_idle_timeout.desc:
|
||||||
"""Close transport-layer connections from the clients that have not sent MQTT CONNECT message within this interval."""
|
"""The timeout for waiting for the WebSocket upgrade request. After the timeout, the connection will be closed."""
|
||||||
|
|
||||||
fields_ws_opts_idle_timeout.label:
|
fields_ws_opts_idle_timeout.label:
|
||||||
"""WS idle timeout"""
|
"""WebSocket Upgrade Timeout"""
|
||||||
|
|
||||||
fields_ws_opts_max_frame_size.desc:
|
fields_ws_opts_max_frame_size.desc:
|
||||||
"""The maximum length of a single MQTT packet."""
|
"""The maximum length of a single MQTT packet."""
|
||||||
|
|
|
@ -1,33 +1,43 @@
|
||||||
emqx_limiter_schema {
|
emqx_limiter_schema {
|
||||||
|
|
||||||
max_conn_rate.desc:
|
max_conn_rate.desc:
|
||||||
"""Maximum connection rate.<br/>
|
"""Used to limit the rate at which the current listener accepts connections.
|
||||||
This is used to limit the connection rate for this node.
|
|
||||||
Once the limit is reached, new connections will be deferred or refused.<br/>
|
Once the limit is reached, EMQX will pause fetching connections from the Accept queue, thereby delaying or rejecting new connections.
|
||||||
For example:<br/>
|
|
||||||
- <code>1000/s</code> :: Only accepts 1000 connections per second<br/>
|
For example:
|
||||||
- <code>1000/10s</code> :: Only accepts 1000 connections every 10 seconds."""
|
|
||||||
|
- `1000/s`: Only accepts 1000 connections per second.
|
||||||
|
- `1000/10s`: Only accepts 1000 connections every 10 seconds."""
|
||||||
|
|
||||||
max_conn_rate.label:
|
max_conn_rate.label:
|
||||||
"""Maximum Connection Rate"""
|
"""Maximum Connection Rate"""
|
||||||
|
|
||||||
messages_rate.desc:
|
messages_rate.desc:
|
||||||
"""Messages publish rate.<br/>
|
"""Used to limit the number of messages a single client can send to EMQX per second.
|
||||||
This is used to limit the inbound message numbers for this node.
|
|
||||||
Once the limit is reached, the restricted client will slow down and even be hung for a while.<br/>
|
Once the limit is reached, EMQX will pause reading data from the receive-buffer, thus slowing down or even temporarily hanging the sender.
|
||||||
For example:<br/>
|
|
||||||
- <code>500/s</code> :: Only the first 500 messages are sent per second and other messages are buffered.<br/>
|
For example:
|
||||||
- <code>500/10s</code> :: Only the first 500 messages are sent even 10 second and other messages are buffered."""
|
|
||||||
|
- `500/s`: Only 500 messages will be received per second, and the remaining messages will be delayed.
|
||||||
|
- `500/10s`: Only 500 messages will be received every 10 seconds and the remaining messages will be delayed."""
|
||||||
|
|
||||||
messages_rate.label:
|
messages_rate.label:
|
||||||
"""Messages Publish Rate"""
|
"""Messages Publish Rate"""
|
||||||
|
|
||||||
bytes_rate.desc:
|
bytes_rate.desc:
|
||||||
"""Data publish rate.<br/>
|
"""Used to limit the number of bytes a single client can send to EMQX per second.
|
||||||
This is used to limit the inbound bytes rate for this node.
|
|
||||||
Once the limit is reached, the restricted client will slow down and even be hung for a while.<br/>
|
Once the limit is reached, EMQX will pause reading data from the receive-buffer, thus slowing down or even temporarily hanging the sender.
|
||||||
The unit of the bytes could be:KB MB GB.<br/>
|
|
||||||
For example:<br/>
|
The unit of the bytes could be: B, KB, MB, GB.
|
||||||
- <code>500KB/s</code> :: Only the first 500 kilobytes are sent per second and other messages are buffered.<br/>
|
|
||||||
- <code>500MB/10s</code> :: Only the first 500 megabytes are sent even 10 second and other messages are buffered."""
|
For example:
|
||||||
|
|
||||||
|
- `500KB/s`: Only 500 kilobytes per second will be received, and the remaining bytes will be delayed.
|
||||||
|
- `500MB/10s`: Only 500 megabytes will be received every 10 seconds, and the remaining bytes will be delayed."""
|
||||||
|
|
||||||
bytes_rate.label:
|
bytes_rate.label:
|
||||||
"""Data Publish Rate"""
|
"""Data Publish Rate"""
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,9 @@ flow_control.desc:
|
||||||
"""Flow control."""
|
"""Flow control."""
|
||||||
|
|
||||||
max_payload_size.desc:
|
max_payload_size.desc:
|
||||||
"""Maximum retained message size."""
|
"""The maximum size of retained messages allowed to be stored. EMQX will refuse to store retained messages larger than this size and output an Error log with the keyword 'retain_failed_for_payload_size_exceeded_limit'.
|
||||||
|
|
||||||
|
0 means unlimited retained message size."""
|
||||||
|
|
||||||
max_retained_messages.desc:
|
max_retained_messages.desc:
|
||||||
"""Maximum number of retained messages. 0 means no limit."""
|
"""Maximum number of retained messages. 0 means no limit."""
|
||||||
|
@ -37,11 +39,14 @@ mnesia_enable.desc:
|
||||||
"""Enable built-in Mnesia backend."""
|
"""Enable built-in Mnesia backend."""
|
||||||
|
|
||||||
msg_clear_interval.desc:
|
msg_clear_interval.desc:
|
||||||
"""Interval for EMQX to scan expired messages and delete them. Never scan if the value is 0."""
|
"""The time interval for checking and clearing expired retained messages. This can prevent expired retained messages from being stored for a long time.
|
||||||
|
|
||||||
|
If `msg_clear_interval` is set to 0, that is, expired retained messages are not actively checked regularly, EMQX will only check and delete expired retained messages when preparing for delivery."""
|
||||||
|
|
||||||
msg_expiry_interval.desc:
|
msg_expiry_interval.desc:
|
||||||
"""Message retention time. This config is only applicable for messages without the Message Expiry Interval message property.
|
"""Expired retained messages will not be delivered again, and a setting of 0 means that retained messages will never expire.
|
||||||
0 means message will never expire."""
|
|
||||||
|
However, if the `Message-Expiry-Interval` property is specified in the MQTT message, the value of that property prevails."""
|
||||||
|
|
||||||
stop_publish_clear_msg.desc:
|
stop_publish_clear_msg.desc:
|
||||||
"""When the retained flag of the `PUBLISH` message is set and Payload is empty,
|
"""When the retained flag of the `PUBLISH` message is set and Payload is empty,
|
||||||
|
|
|
@ -366,6 +366,12 @@ event_validation.desc:
|
||||||
event_validation.label:
|
event_validation.label:
|
||||||
"""Validation"""
|
"""Validation"""
|
||||||
|
|
||||||
|
event_transformation.desc:
|
||||||
|
"""Transformation"""
|
||||||
|
|
||||||
|
event_transformation.label:
|
||||||
|
"""Transformation"""
|
||||||
|
|
||||||
root_rule_info.desc:
|
root_rule_info.desc:
|
||||||
"""Schema for rule info"""
|
"""Schema for rule info"""
|
||||||
|
|
||||||
|
|
|
@ -113,7 +113,7 @@ sysmon_top_max_procs.label:
|
||||||
|
|
||||||
mqtt_use_username_as_clientid.desc:
|
mqtt_use_username_as_clientid.desc:
|
||||||
"""Whether to use Username as Client ID.
|
"""Whether to use Username as Client ID.
|
||||||
This setting takes effect later than <code>Use Peer Certificate as Username</code> and <code>Use peer certificate as Client ID</code>."""
|
This setting takes effect later than `peer_cert_as_username` and `peer_cert_as_clientid`."""
|
||||||
|
|
||||||
mqtt_use_username_as_clientid.label:
|
mqtt_use_username_as_clientid.label:
|
||||||
"""Use Username as Client ID"""
|
"""Use Username as Client ID"""
|
||||||
|
@ -186,7 +186,7 @@ mqtt_session_expiry_interval.label:
|
||||||
"""Session Expiry Interval"""
|
"""Session Expiry Interval"""
|
||||||
|
|
||||||
mqtt_message_expiry_interval.desc:
|
mqtt_message_expiry_interval.desc:
|
||||||
"""The expiry interval of MQTT messages. For MQTT 5.0 clients, this configuration will only take effect when the Message-Expiry-Interval property is not set in the message; otherwise, the value of the Message-Expiry-Interval property will be used. For MQTT versions older than 5.0, this configuration will always take effect. Please note that setting message_expiry_interval greater than session_expiry_interval is meaningless, as all messages will be cleared when the session expires."""
|
"""The expiry interval of MQTT messages. For MQTT 5.0 clients, this configuration will only take effect when the `Message-Expiry-Interval` property is not set in the message; otherwise, the value of the `Message-Expiry-Interval` property will be used. For MQTT versions older than 5.0, this configuration will always take effect. Please note that setting `message_expiry_interval` greater than `session_expiry_interval` is meaningless, as all messages will be cleared when the session expires."""
|
||||||
|
|
||||||
mqtt_message_expiry_interval.label:
|
mqtt_message_expiry_interval.label:
|
||||||
"""Message Expiry Interval"""
|
"""Message Expiry Interval"""
|
||||||
|
@ -270,7 +270,7 @@ common_ssl_opts_schema_cacerts.label:
|
||||||
|
|
||||||
fields_ws_opts_mqtt_path.desc: """~
|
fields_ws_opts_mqtt_path.desc: """~
|
||||||
WebSocket's MQTT protocol path. By default, the full URL for the WebSocket client to connect is:
|
WebSocket's MQTT protocol path. By default, the full URL for the WebSocket client to connect is:
|
||||||
`ws://{ip}:{port}/mqtt`.
|
`ws://{host}:{port}/mqtt`.
|
||||||
Append `/[...]` to the end of the path to make EMQX accept any subpath.
|
Append `/[...]` to the end of the path to make EMQX accept any subpath.
|
||||||
For example, specifying `mqtt/[...]` would allow clients to connect at paths like
|
For example, specifying `mqtt/[...]` would allow clients to connect at paths like
|
||||||
`mqtt/org1` or `mqtt/group2`, etc.
|
`mqtt/org1` or `mqtt/group2`, etc.
|
||||||
|
@ -348,7 +348,13 @@ fields_mqtt_quic_listener_retry_memory_limit.label:
|
||||||
"""Retry memory limit"""
|
"""Retry memory limit"""
|
||||||
|
|
||||||
force_shutdown_max_mailbox_size.desc:
|
force_shutdown_max_mailbox_size.desc:
|
||||||
"""In EMQX, each online client corresponds to an individual Erlang process. The configuration value establishes a mailbox size limit for these processes. If the mailbox size surpasses this limit, the client will be automatically terminated."""
|
"""EMQX creates at least one lightweight process for each client connection.
|
||||||
|
|
||||||
|
Each process has its own message queue (aka mailbox) to hold messages from other processes (e.g. MQTT messages) so that the process can read messages from the message queue (mailbox) at any time.
|
||||||
|
|
||||||
|
If the system is busy or the process hangs due to a busy socket (see `high_watermark`), the message queue can accumulate many messages.
|
||||||
|
|
||||||
|
To avoid excessive memory usage, EMQX will force a process to shut down when the length of its message queue exceeds `max_mailbox_size`."""
|
||||||
|
|
||||||
force_shutdown_max_mailbox_size.label:
|
force_shutdown_max_mailbox_size.label:
|
||||||
"""Maximum mailbox size."""
|
"""Maximum mailbox size."""
|
||||||
|
@ -537,7 +543,11 @@ server_ssl_opts_schema_ocsp_refresh_http_timeout.label:
|
||||||
"""OCSP Refresh HTTP Timeout"""
|
"""OCSP Refresh HTTP Timeout"""
|
||||||
|
|
||||||
fields_tcp_opts_send_timeout.desc:
|
fields_tcp_opts_send_timeout.desc:
|
||||||
"""The TCP send timeout for the connections."""
|
"""The maximum time a process is suspended for sending data to a busy socket. After the timeout, the TCP connection and the process will be closed.
|
||||||
|
|
||||||
|
The process is unsuspended only when the socket is unbusy, that is, the data accumulated in the Erlang internal buffer drops from the high watermark (specified by `high_watermark`) to the low watermark (default 4 KB).
|
||||||
|
|
||||||
|
Therefore, `(high_watermark - 4 KB) / send_timeout` must be a suitable message outflow speed, otherwise the suspended process will never be able to recover before the timeout."""
|
||||||
|
|
||||||
fields_tcp_opts_send_timeout.label:
|
fields_tcp_opts_send_timeout.label:
|
||||||
"""TCP send timeout"""
|
"""TCP send timeout"""
|
||||||
|
@ -557,10 +567,11 @@ fields_tcp_opts_buffer.label:
|
||||||
"""TCP user-space buffer"""
|
"""TCP user-space buffer"""
|
||||||
|
|
||||||
server_ssl_opts_schema_honor_cipher_order.desc:
|
server_ssl_opts_schema_honor_cipher_order.desc:
|
||||||
"""An important security setting. It forces the cipher to be set based
|
"""An important security setting. If this setting is enabled, the server will prioritize the cipher suites it prefers most from the list of cipher suites supported by the client, thus ignoring the client's preferences.
|
||||||
on the server-specified order instead of the client-specified order,
|
|
||||||
hence enforcing the (usually more properly configured) security
|
The server's cipher suites are specified by `ciphers`, with preference decreasing from left to right.
|
||||||
ordering of the server administrator."""
|
|
||||||
|
It is often better to use the server's preferences, as it is more likely that the server will be configured correctly."""
|
||||||
|
|
||||||
server_ssl_opts_schema_honor_cipher_order.label:
|
server_ssl_opts_schema_honor_cipher_order.label:
|
||||||
"""SSL honor cipher order"""
|
"""SSL honor cipher order"""
|
||||||
|
@ -597,15 +608,19 @@ fields_mqtt_quic_listener_stream_recv_window_default.label:
|
||||||
"""Stream recv window default"""
|
"""Stream recv window default"""
|
||||||
|
|
||||||
mqtt_mqueue_priorities.desc:
|
mqtt_mqueue_priorities.desc:
|
||||||
"""Topic priorities. Priority number [1-255]
|
"""Topic priority list. Prioritize messages in the message queue by topic. The priority range is `[1, 255]`.
|
||||||
There's no priority table by default, hence all messages are treated equal.
|
|
||||||
|
|
||||||
**NOTE**: Comma and equal signs are not allowed for priority topic names.
|
The larger the value, the higher the priority. Messages with higher priority will be sent first.
|
||||||
**NOTE**: Messages for topics not in the priority table are treated as either highest or lowest priority depending on the configured value for <code>mqtt.mqueue_default_priority</code>.
|
|
||||||
|
|
||||||
**Examples**:
|
Topics not in this list will use the default priority (specified by `mqueue_default_priority`).
|
||||||
To configure <code>"topic/1" > "topic/2"</code>:
|
|
||||||
<code>mqueue_priorities: {"topic/1": 10, "topic/2": 8}</code>"""
|
By default, this list is empty, which means all topics have the same priority.
|
||||||
|
|
||||||
|
Note: commas and equal signs are not supported in topic names.
|
||||||
|
|
||||||
|
For example, if you want `topic/1` to have a higher priority than `topic/2`, you can configure it like this:
|
||||||
|
|
||||||
|
`mqueue_priorities: {\"topic/1\": 10, \"topic/2\": 8}`"""
|
||||||
|
|
||||||
mqtt_mqueue_priorities.label:
|
mqtt_mqueue_priorities.label:
|
||||||
"""Topic Priorities"""
|
"""Topic Priorities"""
|
||||||
|
@ -623,7 +638,9 @@ fields_rate_limit_max_conn_rate.label:
|
||||||
"""Max connection rate"""
|
"""Max connection rate"""
|
||||||
|
|
||||||
alarm_size_limit.desc:
|
alarm_size_limit.desc:
|
||||||
"""The maximum total number of deactivated alarms to keep as history.<br/>When this limit is exceeded, the oldest deactivated alarms are deleted to cap the total number."""
|
"""The maximum number of historical alarms that can be stored.
|
||||||
|
|
||||||
|
When the maximum number is reached, the oldest historical alarms will be deleted to store new historical alarms."""
|
||||||
|
|
||||||
alarm_size_limit.label:
|
alarm_size_limit.label:
|
||||||
"""Alarm size limit"""
|
"""Alarm size limit"""
|
||||||
|
@ -647,7 +664,9 @@ conn_congestion_enable_alarm.label:
|
||||||
"""Enable/disable congestion alarm"""
|
"""Enable/disable congestion alarm"""
|
||||||
|
|
||||||
fields_ws_opts_proxy_port_header.desc:
|
fields_ws_opts_proxy_port_header.desc:
|
||||||
"""HTTP header used to pass information about the client port. Relevant when the EMQX cluster is deployed behind a load-balancer."""
|
"""The HTTP request header that carries the original client's source port, EMQX will take the leftmost port in the header as the original client's source port.
|
||||||
|
|
||||||
|
This option is typically used when EMQX is deployed behind a WebSocket proxy."""
|
||||||
|
|
||||||
fields_ws_opts_proxy_port_header.label:
|
fields_ws_opts_proxy_port_header.label:
|
||||||
"""Proxy port header"""
|
"""Proxy port header"""
|
||||||
|
@ -740,7 +759,7 @@ fields_tcp_opts_recbuf.label:
|
||||||
"""TCP receive buffer"""
|
"""TCP receive buffer"""
|
||||||
|
|
||||||
sysmon_vm_process_check_interval.desc:
|
sysmon_vm_process_check_interval.desc:
|
||||||
"""The time interval for the periodic process limit check."""
|
"""The time interval for the periodic process count limit check, used together with `process_high_watermark` and `process_low_watermark`."""
|
||||||
|
|
||||||
sysmon_vm_process_check_interval.label:
|
sysmon_vm_process_check_interval.label:
|
||||||
"""Process limit check interval"""
|
"""Process limit check interval"""
|
||||||
|
@ -752,8 +771,9 @@ fields_mqtt_quic_listener_server_resumption_level.label:
|
||||||
"""Server resumption level"""
|
"""Server resumption level"""
|
||||||
|
|
||||||
fields_ws_opts_proxy_address_header.desc:
|
fields_ws_opts_proxy_address_header.desc:
|
||||||
"""HTTP header used to pass information about the client IP address.
|
"""The HTTP request header that carries the original client's IP address, EMQX will take the leftmost IP in the header as the original client's IP.
|
||||||
Relevant when the EMQX cluster is deployed behind a load-balancer."""
|
|
||||||
|
This option is typically used when EMQX is deployed behind a WebSocket proxy."""
|
||||||
|
|
||||||
fields_ws_opts_proxy_address_header.label:
|
fields_ws_opts_proxy_address_header.label:
|
||||||
"""Proxy address header"""
|
"""Proxy address header"""
|
||||||
|
@ -766,8 +786,15 @@ sysmon_os_sysmem_high_watermark.label:
|
||||||
"""SysMem high wartermark"""
|
"""SysMem high wartermark"""
|
||||||
|
|
||||||
fields_tcp_opts_high_watermark.desc:
|
fields_tcp_opts_high_watermark.desc:
|
||||||
"""The socket is set to a busy state when the amount of data queued internally
|
"""When EMQX tries to send more data than the OS has allocated for the socket's send buffer, the remaining data will be temporarily stored in Erlang's internal buffer and then sent in the background.
|
||||||
by the VM socket implementation reaches this limit."""
|
|
||||||
|
If the amount of data queued in the internal buffer exceeds `high_watermark`, the corresponding socket will be marked as busy.
|
||||||
|
|
||||||
|
The process sending data to this busy socket will be suspended until the socket is no longer busy, or the suspension time exceeds `send_timeout`.
|
||||||
|
|
||||||
|
The socket will only be unbusy when the data in the internal buffer is below the low watermark.
|
||||||
|
|
||||||
|
While the process is suspended, the message queue of the process may accumulate, see `max_mailbox_len` for details."""
|
||||||
|
|
||||||
fields_tcp_opts_high_watermark.label:
|
fields_tcp_opts_high_watermark.label:
|
||||||
"""TCP high watermark"""
|
"""TCP high watermark"""
|
||||||
|
@ -801,10 +828,16 @@ mqtt_max_topic_levels.label:
|
||||||
"""Max Topic Levels"""
|
"""Max Topic Levels"""
|
||||||
|
|
||||||
force_shutdown_max_heap_size.desc:
|
force_shutdown_max_heap_size.desc:
|
||||||
"""Total heap size. Setting this to 0 disables this limitation."""
|
"""The maximum heap size of the process. If the `force_shutdown` is enabled, processes that exceed this limit will automatically exit or be forcibly killed. Messages in the process message queue (mailbox) are also part of the heap. The shutdown of a process can be divided into the following two situations:
|
||||||
|
|
||||||
|
- The process actively checks the current heap size during its own operation, and actively exits after finding that it exceeds the limit.
|
||||||
|
- The underlying scheduling system checks the current heap size after performing garbage collection for the process, and forcibly kills the process after finding that it exceeds the limit.
|
||||||
|
|
||||||
|
Note: The Error logs generated by the above two will be different. The log generated by the former is similar to `...errorContext: connection_shutdown, reason: #{max => 2097152, reason => proc_heap_too_large, value => 2787348}..`,
|
||||||
|
and the log generated by the latter is similar to `...Context: maximum heap size reached...`."""
|
||||||
|
|
||||||
force_shutdown_max_heap_size.label:
|
force_shutdown_max_heap_size.label:
|
||||||
"""Total heap size"""
|
"""Maximum Process Heap Size"""
|
||||||
|
|
||||||
mqtt_ignore_loop_deliver.desc:
|
mqtt_ignore_loop_deliver.desc:
|
||||||
"""Whether the messages sent by the MQTT v3.1.1/v3.1.0 client will be looped back to the publisher itself, similar to <code>No Local</code> in MQTT 5.0."""
|
"""Whether the messages sent by the MQTT v3.1.1/v3.1.0 client will be looped back to the publisher itself, similar to <code>No Local</code> in MQTT 5.0."""
|
||||||
|
@ -871,17 +904,18 @@ force_gc_bytes.label:
|
||||||
"""Process GC bytes"""
|
"""Process GC bytes"""
|
||||||
|
|
||||||
server_ssl_opts_schema_fail_if_no_peer_cert.desc:
|
server_ssl_opts_schema_fail_if_no_peer_cert.desc:
|
||||||
"""Used together with {verify, verify_peer} by an TLS/DTLS server.
|
"""This option is only effective if `verify` is set to `verify_peer`.
|
||||||
If set to true, the server fails if the client does not have a
|
|
||||||
certificate to send, that is, sends an empty certificate.
|
If set to `true`, EMQX will reject the connection if the client fails to provide a certificate.
|
||||||
If set to false, it fails only if the client sends an invalid
|
|
||||||
certificate (an empty certificate is considered valid)."""
|
If set to `false`, EMQX will accept clients which don't present a certificate."""
|
||||||
|
|
||||||
server_ssl_opts_schema_fail_if_no_peer_cert.label:
|
server_ssl_opts_schema_fail_if_no_peer_cert.label:
|
||||||
"""SSL fail if no peer cert"""
|
"""SSL fail if no peer cert"""
|
||||||
|
|
||||||
fields_ws_opts_compress.desc:
|
fields_ws_opts_compress.desc:
|
||||||
"""If <code>true</code>, compress WebSocket messages using <code>zlib</code>.<br/>
|
"""If <code>true</code>, compress WebSocket messages using <code>zlib</code>.
|
||||||
|
|
||||||
The configuration items under <code>deflate_opts</code> belong to the compression-related parameter configuration."""
|
The configuration items under <code>deflate_opts</code> belong to the compression-related parameter configuration."""
|
||||||
|
|
||||||
fields_ws_opts_compress.label:
|
fields_ws_opts_compress.label:
|
||||||
|
@ -998,8 +1032,8 @@ fields_ws_opts_supported_subprotocols.label:
|
||||||
mqtt_shared_subscription_strategy.desc:
|
mqtt_shared_subscription_strategy.desc:
|
||||||
"""Dispatch strategy for shared subscription.
|
"""Dispatch strategy for shared subscription.
|
||||||
- `random`: Randomly select a subscriber for dispatch;
|
- `random`: Randomly select a subscriber for dispatch;
|
||||||
- `round_robin`: Messages from a single publisher are dispatched to subscribers in turn;
|
- `round_robin`: Clients in a shared subscription group will consume messages in turn, and the progress of the loop is recorded independently in each publisher, so two adjacent messages from **different publishers** may be consumed by the same client in the subscription group;
|
||||||
- `round_robin_per_group`: All messages are dispatched to subscribers in turn;
|
- `round_robin_per_group`: Clients in a shared subscription group will consume messages in turn, and the progress of the loop is recorded independently in each node, so two adjacent messages from **different nodes** may be consumed by the same client in the subscription group;
|
||||||
- `local`: Randomly select a subscriber on the current node, if there are no subscribers on the current node, then randomly select within the cluster;
|
- `local`: Randomly select a subscriber on the current node, if there are no subscribers on the current node, then randomly select within the cluster;
|
||||||
- `sticky`: Continuously dispatch messages to the initially selected subscriber until their session ends;
|
- `sticky`: Continuously dispatch messages to the initially selected subscriber until their session ends;
|
||||||
- `hash_clientid`: Hash the publisher's client ID to select a subscriber;
|
- `hash_clientid`: Hash the publisher's client ID to select a subscriber;
|
||||||
|
@ -1025,8 +1059,9 @@ base_listener_limiter.label:
|
||||||
"""Type of the rate limit."""
|
"""Type of the rate limit."""
|
||||||
|
|
||||||
alarm_validity_period.desc:
|
alarm_validity_period.desc:
|
||||||
"""Retention time of deactivated alarms. Alarms are not deleted immediately
|
"""The validity period of historical alarms. Calculated from the time of activation of the historical alarm instead of the time of cancelation.
|
||||||
when deactivated, but after the retention time."""
|
|
||||||
|
If it exists longer than the validity period, the alarm will be deleted."""
|
||||||
|
|
||||||
alarm_validity_period.label:
|
alarm_validity_period.label:
|
||||||
"""Alarm validity period"""
|
"""Alarm validity period"""
|
||||||
|
@ -1071,13 +1106,16 @@ common_ssl_opts_schema_password.desc:
|
||||||
"""String containing the user's password. Only used if the private key file is password-protected."""
|
"""String containing the user's password. Only used if the private key file is password-protected."""
|
||||||
|
|
||||||
common_ssl_opts_schema_password.label:
|
common_ssl_opts_schema_password.label:
|
||||||
"""Keyfile passphrase"""
|
"""Keyfile Passphrase"""
|
||||||
|
|
||||||
common_ssl_opts_schema_hibernate_after.desc:
|
common_ssl_opts_schema_hibernate_after.desc:
|
||||||
"""Hibernate the SSL process after idling for amount of time reducing its memory footprint."""
|
"""Specifies the amount of time that an SSL process will hibernate after being idle, thus reducing its memory footprint.
|
||||||
|
|
||||||
|
The hibernating process will be woken up when a new message arrives.
|
||||||
|
Hibernating and waking up too often can cause CPU utilization to increase, as they both perform garbage collection on the process."""
|
||||||
|
|
||||||
common_ssl_opts_schema_hibernate_after.label:
|
common_ssl_opts_schema_hibernate_after.label:
|
||||||
"""hibernate after"""
|
"""Hibernate After"""
|
||||||
|
|
||||||
fields_mqtt_quic_listener_send_buffering_enabled.desc:
|
fields_mqtt_quic_listener_send_buffering_enabled.desc:
|
||||||
"""Buffer send data instead of holding application buffers until sent data is acknowledged. Default: 1 (Enabled)"""
|
"""Buffer send data instead of holding application buffers until sent data is acknowledged. Default: 1 (Enabled)"""
|
||||||
|
@ -1148,14 +1186,13 @@ fields_deflate_opts_strategy.label:
|
||||||
|
|
||||||
shared_subscription_strategy_enum.desc:
|
shared_subscription_strategy_enum.desc:
|
||||||
"""Dispatch strategy for shared subscription.
|
"""Dispatch strategy for shared subscription.
|
||||||
- `random`: dispatch the message to a random selected subscriber
|
- `random`: Randomly select a subscriber for dispatch;
|
||||||
- `round_robin`: select the subscribers in a round-robin manner
|
- `round_robin`: Clients in a shared subscription group will consume messages in turn, and the progress of the loop is recorded independently in each publisher, so two adjacent messages from **different publishers** may be consumed by the same client in the subscription group;
|
||||||
- `round_robin_per_group`: select the subscribers in round-robin fashion within each shared subscriber group
|
- `round_robin_per_group`: Clients in a shared subscription group will consume messages in turn, and the progress of the loop is recorded independently in each node, so two adjacent messages from **different nodes** may be consumed by the same client in the subscription group;
|
||||||
- `sticky`: always use the last selected subscriber to dispatch,
|
- `local`: Randomly select a subscriber on the current node, if there are no subscribers on the current node, then randomly select within the cluster;
|
||||||
until the subscriber disconnects.
|
- `sticky`: Continuously dispatch messages to the initially selected subscriber until their session ends;
|
||||||
- `hash`: select the subscribers by the hash of `clientIds`
|
- `hash_clientid`: Hash the publisher's client ID to select a subscriber;
|
||||||
- `local`: send to a random local subscriber. If local
|
- `hash_topic`: Hash the publishing topic to select a subscriber."""
|
||||||
subscriber was not found, send to a random subscriber cluster-wide"""
|
|
||||||
|
|
||||||
mqtt_mqueue_store_qos0.desc:
|
mqtt_mqueue_store_qos0.desc:
|
||||||
"""Specifies whether to store QoS 0 messages in the message queue while the connection is down but the session remains."""
|
"""Specifies whether to store QoS 0 messages in the message queue while the connection is down but the session remains."""
|
||||||
|
@ -1196,10 +1233,10 @@ fields_mqtt_quic_listener_max_stateless_operations.label:
|
||||||
"""Max stateless operations"""
|
"""Max stateless operations"""
|
||||||
|
|
||||||
fields_ws_opts_idle_timeout.desc:
|
fields_ws_opts_idle_timeout.desc:
|
||||||
"""Close transport-layer connections from the clients that have not sent MQTT CONNECT message within this interval."""
|
"""The timeout for waiting for the WebSocket upgrade request. After the timeout, the connection will be closed."""
|
||||||
|
|
||||||
fields_ws_opts_idle_timeout.label:
|
fields_ws_opts_idle_timeout.label:
|
||||||
"""WS idle timeout"""
|
"""WebSocket Upgrade Timeout"""
|
||||||
|
|
||||||
fields_mqtt_quic_listener_max_ack_delay_ms.desc:
|
fields_mqtt_quic_listener_max_ack_delay_ms.desc:
|
||||||
"""How long to wait after receiving data before sending an ACK. Default: 25"""
|
"""How long to wait after receiving data before sending an ACK. Default: 25"""
|
||||||
|
@ -1241,8 +1278,7 @@ fields_ws_opts_allow_origin_absence.label:
|
||||||
"""Allow origin absence"""
|
"""Allow origin absence"""
|
||||||
|
|
||||||
fields_ws_opts_validate_utf8.desc:
|
fields_ws_opts_validate_utf8.desc:
|
||||||
"""Set to <code>false</code> to disable WebSocket Frame UTF-8
|
"""Whether to verify that the payload of `text` and `close` frames is valid UTF-8. Disabling it can save resources and improve performance."""
|
||||||
validation for performance"""
|
|
||||||
|
|
||||||
fields_ws_opts_validate_utf8.label:
|
fields_ws_opts_validate_utf8.label:
|
||||||
"""Enable/Disable WebSocket Frame utf8 validation"""
|
"""Enable/Disable WebSocket Frame utf8 validation"""
|
||||||
|
@ -1257,7 +1293,11 @@ common_ssl_opts_schema_versions.label:
|
||||||
"""SSL versions"""
|
"""SSL versions"""
|
||||||
|
|
||||||
mqtt_listener_proxy_protocol_timeout.desc:
|
mqtt_listener_proxy_protocol_timeout.desc:
|
||||||
"""Timeout for proxy protocol. EMQX will close the TCP connection if proxy protocol packet is not received within the timeout."""
|
"""If a reverse proxy is deployed for EMQX, and the PROXY protocol is enabled at the proxy to pass the client's real IP, this option needs to be turned on so that EMQX can extract the client's real IP from the PROXY protocol header.
|
||||||
|
|
||||||
|
EMQX will automatically detect the version of the PROXY protocol and support V1 and V2.
|
||||||
|
|
||||||
|
For a detailed description of the PROXY protocol, please refer to: https://www.haproxy.com/blog/haproxy/proxy-protocol/"""
|
||||||
|
|
||||||
mqtt_listener_proxy_protocol_timeout.label:
|
mqtt_listener_proxy_protocol_timeout.label:
|
||||||
"""Proxy protocol timeout"""
|
"""Proxy protocol timeout"""
|
||||||
|
@ -1269,19 +1309,23 @@ fields_mqtt_quic_listener_idle_timeout.label:
|
||||||
"""Idle Timeout"""
|
"""Idle Timeout"""
|
||||||
|
|
||||||
common_ssl_opts_schema_secure_renegotiate.desc:
|
common_ssl_opts_schema_secure_renegotiate.desc:
|
||||||
"""SSL parameter renegotiation is a feature that allows a client and a server
|
"""Whether to reject TLS renegotiation attempts that are not compliant with [RFC 5746](http://www.ietf.org/rfc/rfc5746.txt).
|
||||||
to renegotiate the parameters of the SSL connection on the fly.
|
|
||||||
RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation,
|
By default, `secure_renegotiate` is set to `true`, which forces secure renegotiation.
|
||||||
you drop support for the insecure renegotiation, prone to MitM attacks.<br/>
|
If set to `false`, secure renegotiation will still be used, but will fall back to insecure renegotiation if the peer does not support [RFC 5746](http://www.ietf.org/rfc/rfc5746.txt), which increases the risk of a MitM attack.
|
||||||
Has no effect when TLS version is configured (or negotiated) to 1.3"""
|
|
||||||
|
Has no effect when TLS version is configured (or negotiated) to 1.3."""
|
||||||
|
|
||||||
common_ssl_opts_schema_secure_renegotiate.label:
|
common_ssl_opts_schema_secure_renegotiate.label:
|
||||||
"""SSL renegotiate"""
|
"""SSL Secure Renegotiation"""
|
||||||
|
|
||||||
common_ssl_opts_schema_log_level.desc:
|
common_ssl_opts_schema_log_level.desc:
|
||||||
"""Log level for SSL communication. Default is 'notice'. Set to 'debug' to inspect TLS handshake messages."""
|
"""The minimum level of logging allowed for SSL output.
|
||||||
|
|
||||||
|
The default is `notice`, set to a lower `debug` level for more detailed logging that can be used to investigate SSL handshake issues."""
|
||||||
|
|
||||||
common_ssl_opts_schema_log_level.label:
|
common_ssl_opts_schema_log_level.label:
|
||||||
"""SSL log level"""
|
"""SSL Log Level"""
|
||||||
|
|
||||||
sysmon_vm_busy_port.desc:
|
sysmon_vm_busy_port.desc:
|
||||||
"""When a port (e.g. TCP socket) is overloaded, there will be a <code>busy_port</code> warning log,
|
"""When a port (e.g. TCP socket) is overloaded, there will be a <code>busy_port</code> warning log,
|
||||||
|
@ -1312,7 +1356,7 @@ common_ssl_opts_schema_reuse_sessions.desc:
|
||||||
Has no effect when TLS version is configured (or negotiated) to 1.3"""
|
Has no effect when TLS version is configured (or negotiated) to 1.3"""
|
||||||
|
|
||||||
common_ssl_opts_schema_reuse_sessions.label:
|
common_ssl_opts_schema_reuse_sessions.label:
|
||||||
"""TLS session reuse"""
|
"""TLS Session Reuse"""
|
||||||
|
|
||||||
common_ssl_opts_schema_depth.desc:
|
common_ssl_opts_schema_depth.desc:
|
||||||
"""Maximum number of non-self-issued intermediate certificates that can follow the peer certificate in a valid certification path.
|
"""Maximum number of non-self-issued intermediate certificates that can follow the peer certificate in a valid certification path.
|
||||||
|
@ -1442,7 +1486,12 @@ mqtt_listener_proxy_protocol.label:
|
||||||
"""Proxy protocol"""
|
"""Proxy protocol"""
|
||||||
|
|
||||||
mqtt_listener_access_rules.desc:
|
mqtt_listener_access_rules.desc:
|
||||||
"""The access control rules for this listener.<br/>See: https://github.com/emqtt/esockd#allowdeny"""
|
"""An access rule list consisting of string rules to restrict or allow access from some addresses. The rules that appear earlier in the list are matched first.
|
||||||
|
The format is `allow | deny <address> | <CIDR> | all`.
|
||||||
|
|
||||||
|
For example:
|
||||||
|
|
||||||
|
`[\"deny 192.168.1.1\", \"allow 192.168.1.0/24\", \"deny, all\"]`"""
|
||||||
|
|
||||||
mqtt_listener_access_rules.label:
|
mqtt_listener_access_rules.label:
|
||||||
"""Access rules"""
|
"""Access rules"""
|
||||||
|
@ -1536,9 +1585,7 @@ Topic match performance (when publishing) may degrade if messages are mostly pub
|
||||||
NOTE: This is a cluster-wide configuration. It requires all nodes to be stopped before changing it."""
|
NOTE: This is a cluster-wide configuration. It requires all nodes to be stopped before changing it."""
|
||||||
|
|
||||||
sysmon_vm_large_heap.desc:
|
sysmon_vm_large_heap.desc:
|
||||||
"""When an Erlang process consumed a large amount of memory for its heap space,
|
"""When the heap memory occupied by a process exceeds the size specified by `large_heap`, the system will write a warning level `large_heap` log, and an MQTT message will be published to the system topic `$SYS/sysmon/large_heap`."""
|
||||||
the system will write a warning level <code>large_heap</code> log, and an MQTT message is published to
|
|
||||||
the system topic <code>$SYS/sysmon/large_heap</code>."""
|
|
||||||
|
|
||||||
sysmon_vm_large_heap.label:
|
sysmon_vm_large_heap.label:
|
||||||
"""Enable Large Heap monitoring."""
|
"""Enable Large Heap monitoring."""
|
||||||
|
|
|
@ -1,18 +1,25 @@
|
||||||
emqx_slow_subs_schema {
|
emqx_slow_subs_schema {
|
||||||
|
|
||||||
enable.desc:
|
enable.desc:
|
||||||
"""Enable this feature"""
|
"""Enable Slow Subscriptions"""
|
||||||
|
|
||||||
expire_interval.desc:
|
expire_interval.desc:
|
||||||
"""The eviction time of the record, which in the statistics record table"""
|
"""The expiration time of the slow subscription record, if the record is not updated within the expiration time, then the record will be deleted."""
|
||||||
|
|
||||||
stats_type.desc:
|
stats_type.desc:
|
||||||
"""The method to calculate the latency"""
|
"""Message latency calculation method:
|
||||||
|
|
||||||
|
- `whole`: The time from when the message arrives at the EMQX (the EMQX gets the message from the receive-buffer) until the message completes delivery.
|
||||||
|
- `internal`: The time from when the message arrives at the EMQX (the EMQX gets the message from the receive-buffer) to when the message begins to be delivered (the EMQX attempts to write the message to the send-buffer).
|
||||||
|
- `response`: The time from the start of message delivery to the completion.
|
||||||
|
|
||||||
|
Note: The completion delivery time refers to the time when QoS 1 and 2 messages complete the MQTT message response process, i.e., the time when QoS 1 message receives the PUBACK packet and QoS 2 message receives the PUBCOMP packet.
|
||||||
|
Since there is no response packet for QoS 0 message, the completion delivery time of the QoS 0 message will be replaced by the time when the message starts to be delivered. Therefore, when using the `response` method to calculate the latency, the latency of a QoS 0 message will always be equal to 0."""
|
||||||
|
|
||||||
threshold.desc:
|
threshold.desc:
|
||||||
"""The latency threshold for statistics"""
|
"""The Client ID and topic of the consumer whose message latency is greater than this threshold will be recorded in the slow subscription list."""
|
||||||
|
|
||||||
top_k_num.desc:
|
top_k_num.desc:
|
||||||
"""The maximum number of records in the slow subscription statistics record table"""
|
"""The maximum number of slow-subscription records, up to a maximum of 1000."""
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
emqx_telemetry_schema {
|
emqx_telemetry_schema {
|
||||||
telemetry_root_doc.desc:
|
telemetry_root_doc.desc:
|
||||||
"""Configure telemetry data report from this EMQX node to EMQ's telemetry data collection server.
|
"""Whether to enable telemetry to allow EMQX to collect relevant usage information and share it with EMQ for the purpose of enhancing your product experience, and in no case will EMQX collect personal information about you, such as your MAC address, IP address, content of messages sent.
|
||||||
See https://www.emqx.io/docs/en/v5.0/telemetry/telemetry.html for more details."""
|
|
||||||
|
See https://docs.emqx.com/en/emqx/latest/telemetry/telemetry.html for more details."""
|
||||||
|
|
||||||
enable.desc:
|
enable.desc:
|
||||||
"""Set to `false` disable telemetry data report"""
|
"""Set to `false` disable telemetry data report"""
|
||||||
|
|
|
@ -56,6 +56,7 @@ PSK
|
||||||
PSK
|
PSK
|
||||||
PSKs
|
PSKs
|
||||||
PUBACK
|
PUBACK
|
||||||
|
PUBCOMP
|
||||||
PUBREL
|
PUBREL
|
||||||
PUBLISH
|
PUBLISH
|
||||||
QoS
|
QoS
|
||||||
|
@ -195,6 +196,7 @@ params
|
||||||
peerhost
|
peerhost
|
||||||
peername
|
peername
|
||||||
perf
|
perf
|
||||||
|
pipelining
|
||||||
powershell
|
powershell
|
||||||
procmem
|
procmem
|
||||||
procs
|
procs
|
||||||
|
@ -258,6 +260,7 @@ typerefl
|
||||||
udp
|
udp
|
||||||
uid
|
uid
|
||||||
un-acked
|
un-acked
|
||||||
|
unbusy
|
||||||
unsub
|
unsub
|
||||||
uplink
|
uplink
|
||||||
url
|
url
|
||||||
|
|
Loading…
Reference in New Issue