Merge pull request #10803 from JimMoen/merge-release-50
Merge release-50 into master
This commit is contained in:
commit
14a6b36899
|
@ -40,3 +40,5 @@
|
||||||
session,
|
session,
|
||||||
will_msg
|
will_msg
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-define(EXPIRE_INTERVAL_INFINITE, 4294967295000).
|
||||||
|
|
|
@ -2088,7 +2088,7 @@ maybe_resume_session(#channel{
|
||||||
|
|
||||||
maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
|
maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
case maps:get(expiry_interval, ConnInfo) of
|
case maps:get(expiry_interval, ConnInfo) of
|
||||||
?UINT_MAX ->
|
?EXPIRE_INTERVAL_INFINITE ->
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
I when I > 0 ->
|
I when I > 0 ->
|
||||||
{ok, ensure_timer(expire_timer, I, Channel)};
|
{ok, ensure_timer(expire_timer, I, Channel)};
|
||||||
|
|
|
@ -773,6 +773,7 @@ mark_channel_connected(ChanPid) ->
|
||||||
mark_channel_disconnected(ChanPid) ->
|
mark_channel_disconnected(ChanPid) ->
|
||||||
?tp(emqx_cm_connected_client_count_dec, #{chan_pid => ChanPid}),
|
?tp(emqx_cm_connected_client_count_dec, #{chan_pid => ChanPid}),
|
||||||
ets:delete(?CHAN_LIVE_TAB, ChanPid),
|
ets:delete(?CHAN_LIVE_TAB, ChanPid),
|
||||||
|
?tp(emqx_cm_connected_client_count_dec_done, #{chan_pid => ChanPid}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
get_connected_client_count() ->
|
get_connected_client_count() ->
|
||||||
|
|
|
@ -1556,7 +1556,8 @@ fields("broker") ->
|
||||||
boolean(),
|
boolean(),
|
||||||
#{
|
#{
|
||||||
default => true,
|
default => true,
|
||||||
desc => ?DESC(broker_route_batch_clean)
|
desc => "This config is stale since 4.3",
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{"perf",
|
{"perf",
|
||||||
|
|
|
@ -60,14 +60,12 @@
|
||||||
-export_type([sess_msg_key/0]).
|
-export_type([sess_msg_key/0]).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
-include("emqx_channel.hrl").
|
||||||
-include("emqx_persistent_session.hrl").
|
-include("emqx_persistent_session.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-compile({inline, [is_store_enabled/0]}).
|
-compile({inline, [is_store_enabled/0]}).
|
||||||
|
|
||||||
%% 16#FFFFFFFF * 1000
|
|
||||||
-define(MAX_EXPIRY_INTERVAL, 4294967295000).
|
|
||||||
|
|
||||||
%% NOTE: Order is significant because of traversal order of the table.
|
%% NOTE: Order is significant because of traversal order of the table.
|
||||||
-define(MARKER, 3).
|
-define(MARKER, 3).
|
||||||
-define(DELIVERED, 2).
|
-define(DELIVERED, 2).
|
||||||
|
@ -424,7 +422,7 @@ pending(SessionID, MarkerIds) ->
|
||||||
%% @private [MQTT-3.1.2-23]
|
%% @private [MQTT-3.1.2-23]
|
||||||
persistent_session_status(#session_store{expiry_interval = 0}) ->
|
persistent_session_status(#session_store{expiry_interval = 0}) ->
|
||||||
not_persistent;
|
not_persistent;
|
||||||
persistent_session_status(#session_store{expiry_interval = ?MAX_EXPIRY_INTERVAL}) ->
|
persistent_session_status(#session_store{expiry_interval = ?EXPIRE_INTERVAL_INFINITE}) ->
|
||||||
persistent;
|
persistent;
|
||||||
persistent_session_status(#session_store{expiry_interval = E, ts = TS}) ->
|
persistent_session_status(#session_store{expiry_interval = E, ts = TS}) ->
|
||||||
case E + TS > erlang:system_time(millisecond) of
|
case E + TS > erlang:system_time(millisecond) of
|
||||||
|
|
|
@ -211,7 +211,7 @@ send_message(BridgeId, Message) ->
|
||||||
|
|
||||||
query_opts(Config) ->
|
query_opts(Config) ->
|
||||||
case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of
|
case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of
|
||||||
Timeout when is_integer(Timeout) ->
|
Timeout when is_integer(Timeout) orelse Timeout =:= infinity ->
|
||||||
%% request_timeout is configured
|
%% request_timeout is configured
|
||||||
#{timeout => Timeout};
|
#{timeout => Timeout};
|
||||||
_ ->
|
_ ->
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
-ifndef(EMQX_BRIDGE_IOTDB_HRL).
|
-ifndef(EMQX_BRIDGE_IOTDB_HRL).
|
||||||
-define(EMQX_BRIDGE_IOTDB_HRL, true).
|
-define(EMQX_BRIDGE_IOTDB_HRL, true).
|
||||||
|
|
||||||
-define(VSN_1_0_X, 'v1.0.x').
|
-define(VSN_1_X, 'v1.x').
|
||||||
-define(VSN_0_13_X, 'v0.13.x').
|
-define(VSN_0_13_X, 'v0.13.x').
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_bridge_iotdb, [
|
{application, emqx_bridge_iotdb, [
|
||||||
{description, "EMQX Enterprise Apache IoTDB Bridge"},
|
{description, "EMQX Enterprise Apache IoTDB Bridge"},
|
||||||
{vsn, "0.1.0"},
|
{vsn, "0.1.1"},
|
||||||
{modules, [
|
{modules, [
|
||||||
emqx_bridge_iotdb,
|
emqx_bridge_iotdb,
|
||||||
emqx_bridge_iotdb_impl
|
emqx_bridge_iotdb_impl
|
||||||
|
|
|
@ -109,10 +109,10 @@ basic_config() ->
|
||||||
)},
|
)},
|
||||||
{iotdb_version,
|
{iotdb_version,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:enum([?VSN_1_0_X, ?VSN_0_13_X]),
|
hoconsc:enum([?VSN_1_X, ?VSN_0_13_X]),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("config_iotdb_version"),
|
desc => ?DESC("config_iotdb_version"),
|
||||||
default => ?VSN_1_0_X
|
default => ?VSN_1_X
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
] ++ resource_creation_opts() ++
|
] ++ resource_creation_opts() ++
|
||||||
|
@ -217,7 +217,7 @@ conn_bridge_example(_Method, Type) ->
|
||||||
is_aligned => false,
|
is_aligned => false,
|
||||||
device_id => <<"my_device">>,
|
device_id => <<"my_device">>,
|
||||||
base_url => <<"http://iotdb.local:18080/">>,
|
base_url => <<"http://iotdb.local:18080/">>,
|
||||||
iotdb_version => ?VSN_1_0_X,
|
iotdb_version => ?VSN_1_X,
|
||||||
connect_timeout => <<"15s">>,
|
connect_timeout => <<"15s">>,
|
||||||
pool_type => <<"random">>,
|
pool_type => <<"random">>,
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
|
|
|
@ -143,24 +143,42 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
preproc_data(DataList) ->
|
make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) ->
|
||||||
|
emqx_utils_json:decode(PayloadUnparsed, [return_maps]);
|
||||||
|
make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) ->
|
||||||
|
lists:map(fun make_parsed_payload/1, PayloadUnparsed);
|
||||||
|
make_parsed_payload(
|
||||||
|
#{
|
||||||
|
measurement := Measurement,
|
||||||
|
data_type := DataType,
|
||||||
|
value := Value
|
||||||
|
} = Data
|
||||||
|
) ->
|
||||||
|
Data#{
|
||||||
|
<<"measurement">> => Measurement,
|
||||||
|
<<"data_type">> => DataType,
|
||||||
|
<<"value">> => Value
|
||||||
|
}.
|
||||||
|
|
||||||
|
preproc_data(
|
||||||
|
#{
|
||||||
|
<<"measurement">> := Measurement,
|
||||||
|
<<"data_type">> := DataType,
|
||||||
|
<<"value">> := Value
|
||||||
|
} = Data
|
||||||
|
) ->
|
||||||
|
#{
|
||||||
|
timestamp => emqx_plugin_libs_rule:preproc_tmpl(
|
||||||
|
maps:get(<<"timestamp">>, Data, <<"now">>)
|
||||||
|
),
|
||||||
|
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
|
||||||
|
data_type => DataType,
|
||||||
|
value => emqx_plugin_libs_rule:preproc_tmpl(Value)
|
||||||
|
}.
|
||||||
|
|
||||||
|
preproc_data_list(DataList) ->
|
||||||
lists:map(
|
lists:map(
|
||||||
fun(
|
fun preproc_data/1,
|
||||||
#{
|
|
||||||
measurement := Measurement,
|
|
||||||
data_type := DataType,
|
|
||||||
value := Value
|
|
||||||
} = Data
|
|
||||||
) ->
|
|
||||||
#{
|
|
||||||
timestamp => emqx_plugin_libs_rule:preproc_tmpl(
|
|
||||||
maps:get(<<"timestamp">>, Data, <<"now">>)
|
|
||||||
),
|
|
||||||
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
|
|
||||||
data_type => DataType,
|
|
||||||
value => emqx_plugin_libs_rule:preproc_tmpl(Value)
|
|
||||||
}
|
|
||||||
end,
|
|
||||||
DataList
|
DataList
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -258,12 +276,13 @@ convert_float(Str) when is_binary(Str) ->
|
||||||
convert_float(undefined) ->
|
convert_float(undefined) ->
|
||||||
null.
|
null.
|
||||||
|
|
||||||
make_iotdb_insert_request(Message, State) ->
|
make_iotdb_insert_request(MessageUnparsedPayload, State) ->
|
||||||
|
Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload),
|
||||||
IsAligned = maps:get(is_aligned, State, false),
|
IsAligned = maps:get(is_aligned, State, false),
|
||||||
DeviceId = device_id(Message, State),
|
DeviceId = device_id(Message, State),
|
||||||
IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X),
|
IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_X),
|
||||||
Payload = make_list(maps:get(payload, Message)),
|
Payload = make_list(maps:get(payload, Message)),
|
||||||
PreProcessedData = preproc_data(Payload),
|
PreProcessedData = preproc_data_list(Payload),
|
||||||
DataList = proc_data(PreProcessedData, Message),
|
DataList = proc_data(PreProcessedData, Message),
|
||||||
InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
|
InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
|
||||||
Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
|
Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
|
||||||
|
@ -330,15 +349,15 @@ insert_value(1, Data, [Value | Values]) ->
|
||||||
insert_value(Index, Data, [Value | Values]) ->
|
insert_value(Index, Data, [Value | Values]) ->
|
||||||
[[null | Value] | insert_value(Index - 1, Data, Values)].
|
[[null | Value] | insert_value(Index - 1, Data, Values)].
|
||||||
|
|
||||||
iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
|
iotdb_field_key(is_aligned, ?VSN_1_X) ->
|
||||||
<<"is_aligned">>;
|
<<"is_aligned">>;
|
||||||
iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
|
iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
|
||||||
<<"isAligned">>;
|
<<"isAligned">>;
|
||||||
iotdb_field_key(device_id, ?VSN_1_0_X) ->
|
iotdb_field_key(device_id, ?VSN_1_X) ->
|
||||||
<<"device">>;
|
<<"device">>;
|
||||||
iotdb_field_key(device_id, ?VSN_0_13_X) ->
|
iotdb_field_key(device_id, ?VSN_0_13_X) ->
|
||||||
<<"deviceId">>;
|
<<"deviceId">>;
|
||||||
iotdb_field_key(data_types, ?VSN_1_0_X) ->
|
iotdb_field_key(data_types, ?VSN_1_X) ->
|
||||||
<<"data_types">>;
|
<<"data_types">>;
|
||||||
iotdb_field_key(data_types, ?VSN_0_13_X) ->
|
iotdb_field_key(data_types, ?VSN_0_13_X) ->
|
||||||
<<"dataTypes">>.
|
<<"dataTypes">>.
|
||||||
|
@ -350,6 +369,8 @@ device_id(Message, State) ->
|
||||||
case maps:get(device_id, State, undefined) of
|
case maps:get(device_id, State, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
case maps:get(payload, Message) of
|
case maps:get(payload, Message) of
|
||||||
|
#{<<"device_id">> := DeviceId} ->
|
||||||
|
DeviceId;
|
||||||
#{device_id := DeviceId} ->
|
#{device_id := DeviceId} ->
|
||||||
DeviceId;
|
DeviceId;
|
||||||
_NotFound ->
|
_NotFound ->
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
{erl_opts, [debug_info]}.
|
{erl_opts, [debug_info]}.
|
||||||
{deps, [
|
{deps, [
|
||||||
{pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.1"}}},
|
{pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.2"}}},
|
||||||
{emqx_connector, {path, "../../apps/emqx_connector"}},
|
{emqx_connector, {path, "../../apps/emqx_connector"}},
|
||||||
{emqx_resource, {path, "../../apps/emqx_resource"}},
|
{emqx_resource, {path, "../../apps/emqx_resource"}},
|
||||||
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
|
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
|
||||||
|
|
|
@ -57,6 +57,14 @@ fields(config) ->
|
||||||
sensitive => true,
|
sensitive => true,
|
||||||
desc => ?DESC("authentication")
|
desc => ?DESC("authentication")
|
||||||
}
|
}
|
||||||
|
)},
|
||||||
|
{connect_timeout,
|
||||||
|
mk(
|
||||||
|
emqx_schema:duration_ms(),
|
||||||
|
#{
|
||||||
|
default => <<"5s">>,
|
||||||
|
desc => ?DESC("connect_timeout")
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
] ++ emqx_connector_schema_lib:ssl_fields();
|
] ++ emqx_connector_schema_lib:ssl_fields();
|
||||||
fields(producer_opts) ->
|
fields(producer_opts) ->
|
||||||
|
|
|
@ -48,6 +48,7 @@
|
||||||
memory_overload_protection := boolean()
|
memory_overload_protection := boolean()
|
||||||
},
|
},
|
||||||
compression := compression_mode(),
|
compression := compression_mode(),
|
||||||
|
connect_timeout := emqx_schema:duration_ms(),
|
||||||
max_batch_bytes := emqx_schema:bytesize(),
|
max_batch_bytes := emqx_schema:bytesize(),
|
||||||
message := message_template_raw(),
|
message := message_template_raw(),
|
||||||
pulsar_topic := binary(),
|
pulsar_topic := binary(),
|
||||||
|
@ -81,7 +82,9 @@ on_start(InstanceId, Config) ->
|
||||||
Servers = format_servers(Servers0),
|
Servers = format_servers(Servers0),
|
||||||
ClientId = make_client_id(InstanceId, BridgeName),
|
ClientId = make_client_id(InstanceId, BridgeName),
|
||||||
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
|
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
|
||||||
|
ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
|
||||||
ClientOpts = #{
|
ClientOpts = #{
|
||||||
|
connect_timeout => ConnectTimeout,
|
||||||
ssl_opts => SSLOpts,
|
ssl_opts => SSLOpts,
|
||||||
conn_opts => conn_opts(Config)
|
conn_opts => conn_opts(Config)
|
||||||
},
|
},
|
||||||
|
@ -96,13 +99,19 @@ on_start(InstanceId, Config) ->
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
RedactedReason = emqx_utils:redact(Reason, fun is_sensitive_key/1),
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "failed_to_start_pulsar_client",
|
msg => "failed_to_start_pulsar_client",
|
||||||
instance_id => InstanceId,
|
instance_id => InstanceId,
|
||||||
pulsar_hosts => Servers,
|
pulsar_hosts => Servers,
|
||||||
reason => emqx_utils:redact(Reason, fun is_sensitive_key/1)
|
reason => RedactedReason
|
||||||
}),
|
}),
|
||||||
throw(failed_to_start_pulsar_client)
|
Message =
|
||||||
|
case get_error_message(RedactedReason) of
|
||||||
|
{ok, Msg} -> Msg;
|
||||||
|
error -> failed_to_start_pulsar_client
|
||||||
|
end,
|
||||||
|
throw(Message)
|
||||||
end,
|
end,
|
||||||
start_producer(Config, InstanceId, ClientId, ClientOpts).
|
start_producer(Config, InstanceId, ClientId, ClientOpts).
|
||||||
|
|
||||||
|
@ -422,3 +431,19 @@ partition_strategy(Strategy) -> Strategy.
|
||||||
|
|
||||||
is_sensitive_key(auth_data) -> true;
|
is_sensitive_key(auth_data) -> true;
|
||||||
is_sensitive_key(_) -> false.
|
is_sensitive_key(_) -> false.
|
||||||
|
|
||||||
|
get_error_message({BrokerErrorMap, _}) when is_map(BrokerErrorMap) ->
|
||||||
|
Iter = maps:iterator(BrokerErrorMap),
|
||||||
|
do_get_error_message(Iter);
|
||||||
|
get_error_message(_Error) ->
|
||||||
|
error.
|
||||||
|
|
||||||
|
do_get_error_message(Iter) ->
|
||||||
|
case maps:next(Iter) of
|
||||||
|
{{_Broker, _Port}, #{message := Message}, _NIter} ->
|
||||||
|
{ok, Message};
|
||||||
|
{_K, _V, NIter} ->
|
||||||
|
do_get_error_message(NIter);
|
||||||
|
none ->
|
||||||
|
error
|
||||||
|
end.
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_rabbitmq, [
|
{application, emqx_bridge_rabbitmq, [
|
||||||
{description, "EMQX Enterprise RabbitMQ Bridge"},
|
{description, "EMQX Enterprise RabbitMQ Bridge"},
|
||||||
{vsn, "0.1.0"},
|
{vsn, "0.1.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel, stdlib, ecql, rabbit_common, amqp_client]},
|
{applications, [kernel, stdlib, ecql, rabbit_common, amqp_client]},
|
||||||
{env, []},
|
{env, []},
|
||||||
|
|
|
@ -72,7 +72,7 @@ fields(config) ->
|
||||||
desc => ?DESC("username")
|
desc => ?DESC("username")
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{password, fun emqx_connector_schema_lib:password/1},
|
{password, fun emqx_connector_schema_lib:password_required/1},
|
||||||
{pool_size,
|
{pool_size,
|
||||||
hoconsc:mk(
|
hoconsc:mk(
|
||||||
typerefl:pos_integer(),
|
typerefl:pos_integer(),
|
||||||
|
|
|
@ -508,6 +508,7 @@ fields("node") ->
|
||||||
desc => ?DESC(node_crash_dump_file),
|
desc => ?DESC(node_crash_dump_file),
|
||||||
default => crash_dump_file_default(),
|
default => crash_dump_file_default(),
|
||||||
importance => ?IMPORTANCE_HIDDEN,
|
importance => ?IMPORTANCE_HIDDEN,
|
||||||
|
converter => fun ensure_unicode_path/2,
|
||||||
'readOnly' => true
|
'readOnly' => true
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -755,6 +756,7 @@ fields("rpc") ->
|
||||||
file(),
|
file(),
|
||||||
#{
|
#{
|
||||||
mapping => "gen_rpc.certfile",
|
mapping => "gen_rpc.certfile",
|
||||||
|
converter => fun ensure_unicode_path/2,
|
||||||
desc => ?DESC(rpc_certfile)
|
desc => ?DESC(rpc_certfile)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -763,6 +765,7 @@ fields("rpc") ->
|
||||||
file(),
|
file(),
|
||||||
#{
|
#{
|
||||||
mapping => "gen_rpc.keyfile",
|
mapping => "gen_rpc.keyfile",
|
||||||
|
converter => fun ensure_unicode_path/2,
|
||||||
desc => ?DESC(rpc_keyfile)
|
desc => ?DESC(rpc_keyfile)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -771,6 +774,7 @@ fields("rpc") ->
|
||||||
file(),
|
file(),
|
||||||
#{
|
#{
|
||||||
mapping => "gen_rpc.cacertfile",
|
mapping => "gen_rpc.cacertfile",
|
||||||
|
converter => fun ensure_unicode_path/2,
|
||||||
desc => ?DESC(rpc_cacertfile)
|
desc => ?DESC(rpc_cacertfile)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -897,10 +901,11 @@ fields("log_file_handler") ->
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("log_file_handler_file"),
|
desc => ?DESC("log_file_handler_file"),
|
||||||
default => <<"${EMQX_LOG_DIR}/emqx.log">>,
|
default => <<"${EMQX_LOG_DIR}/emqx.log">>,
|
||||||
converter => fun emqx_schema:naive_env_interpolation/1,
|
|
||||||
validator => fun validate_file_location/1,
|
|
||||||
aliases => [file],
|
aliases => [file],
|
||||||
importance => ?IMPORTANCE_HIGH
|
importance => ?IMPORTANCE_HIGH,
|
||||||
|
converter => fun(Path, Opts) ->
|
||||||
|
emqx_schema:naive_env_interpolation(ensure_unicode_path(Path, Opts))
|
||||||
|
end
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{"rotation_count",
|
{"rotation_count",
|
||||||
|
@ -1318,11 +1323,6 @@ emqx_schema_high_prio_roots() ->
|
||||||
)},
|
)},
|
||||||
lists:keyreplace("authorization", 1, Roots, Authz).
|
lists:keyreplace("authorization", 1, Roots, Authz).
|
||||||
|
|
||||||
validate_file_location(File) ->
|
|
||||||
ValidFile = "^[/\\_a-zA-Z0-9\\.\\-]*$",
|
|
||||||
Error = "Invalid file name: " ++ ValidFile,
|
|
||||||
validator_string_re(File, ValidFile, Error).
|
|
||||||
|
|
||||||
validate_time_offset(Offset) ->
|
validate_time_offset(Offset) ->
|
||||||
ValidTimeOffset = "^([\\-\\+][0-1][0-9]:[0-6][0-9]|system|utc)$",
|
ValidTimeOffset = "^([\\-\\+][0-1][0-9]:[0-6][0-9]|system|utc)$",
|
||||||
Error =
|
Error =
|
||||||
|
@ -1356,3 +1356,20 @@ ensure_file_handlers(Conf, _Opts) ->
|
||||||
convert_rotation(undefined, _Opts) -> undefined;
|
convert_rotation(undefined, _Opts) -> undefined;
|
||||||
convert_rotation(#{} = Rotation, _Opts) -> maps:get(<<"count">>, Rotation, 10);
|
convert_rotation(#{} = Rotation, _Opts) -> maps:get(<<"count">>, Rotation, 10);
|
||||||
convert_rotation(Count, _Opts) when is_integer(Count) -> Count.
|
convert_rotation(Count, _Opts) when is_integer(Count) -> Count.
|
||||||
|
|
||||||
|
ensure_unicode_path(undefined, _) ->
|
||||||
|
undefined;
|
||||||
|
ensure_unicode_path(Path, #{make_serializable := true}) ->
|
||||||
|
%% format back to serializable string
|
||||||
|
unicode:characters_to_binary(Path, utf8);
|
||||||
|
ensure_unicode_path(Path, Opts) when is_binary(Path) ->
|
||||||
|
case unicode:characters_to_list(Path, utf8) of
|
||||||
|
{R, _, _} when R =:= error orelse R =:= incomplete ->
|
||||||
|
throw({"bad_file_path_string", Path});
|
||||||
|
PathStr ->
|
||||||
|
ensure_unicode_path(PathStr, Opts)
|
||||||
|
end;
|
||||||
|
ensure_unicode_path(Path, _) when is_list(Path) ->
|
||||||
|
Path;
|
||||||
|
ensure_unicode_path(Path, _) ->
|
||||||
|
throw({"not_string", Path}).
|
||||||
|
|
|
@ -438,3 +438,63 @@ ensure_acl_conf() ->
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> file:write_file(File, <<"">>)
|
false -> file:write_file(File, <<"">>)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
log_path_test_() ->
|
||||||
|
Fh = fun(Path) ->
|
||||||
|
#{<<"log">> => #{<<"file_handlers">> => #{<<"name1">> => #{<<"file">> => Path}}}}
|
||||||
|
end,
|
||||||
|
Assert = fun(Name, Path, Conf) ->
|
||||||
|
?assertMatch(#{log := #{file := #{Name := #{to := Path}}}}, Conf)
|
||||||
|
end,
|
||||||
|
|
||||||
|
[
|
||||||
|
{"default-values", fun() -> Assert(default, "log/emqx.log", check(#{})) end},
|
||||||
|
{"file path with space", fun() -> Assert(name1, "a /b", check(Fh(<<"a /b">>))) end},
|
||||||
|
{"windows", fun() -> Assert(name1, "c:\\a\\ b\\", check(Fh(<<"c:\\a\\ b\\">>))) end},
|
||||||
|
{"unicoded", fun() -> Assert(name1, "路 径", check(Fh(<<"路 径"/utf8>>))) end},
|
||||||
|
{"bad utf8", fun() ->
|
||||||
|
?assertThrow(
|
||||||
|
{emqx_conf_schema, [
|
||||||
|
#{
|
||||||
|
kind := validation_error,
|
||||||
|
mismatches :=
|
||||||
|
#{
|
||||||
|
"handler_name" :=
|
||||||
|
#{
|
||||||
|
kind := validation_error,
|
||||||
|
path := "log.file.name1.to",
|
||||||
|
reason := {"bad_file_path_string", _}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]},
|
||||||
|
check(Fh(<<239, 32, 132, 47, 117, 116, 102, 56>>))
|
||||||
|
)
|
||||||
|
end},
|
||||||
|
{"not string", fun() ->
|
||||||
|
?assertThrow(
|
||||||
|
{emqx_conf_schema, [
|
||||||
|
#{
|
||||||
|
kind := validation_error,
|
||||||
|
mismatches :=
|
||||||
|
#{
|
||||||
|
"handler_name" :=
|
||||||
|
#{
|
||||||
|
kind := validation_error,
|
||||||
|
path := "log.file.name1.to",
|
||||||
|
reason := {"not_string", _}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]},
|
||||||
|
check(Fh(#{<<"foo">> => <<"bar">>}))
|
||||||
|
)
|
||||||
|
end}
|
||||||
|
].
|
||||||
|
|
||||||
|
check(Config) ->
|
||||||
|
Schema = emqx_conf_schema,
|
||||||
|
{_, Conf} = hocon_tconf:map(Schema, Config, [log], #{
|
||||||
|
atom_key => false, required => false, format => map
|
||||||
|
}),
|
||||||
|
emqx_utils_maps:unsafe_atom_key_map(Conf).
|
||||||
|
|
|
@ -406,7 +406,9 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
|
||||||
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
|
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
|
||||||
case do_get_status(PoolName, Timeout) of
|
case do_get_status(PoolName, Timeout) of
|
||||||
ok ->
|
ok ->
|
||||||
{connected, State};
|
connected;
|
||||||
|
{error, still_connecting} ->
|
||||||
|
connecting;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{disconnected, State, Reason}
|
{disconnected, State, Reason}
|
||||||
end.
|
end.
|
||||||
|
@ -428,7 +430,8 @@ do_get_status(PoolName, Timeout) ->
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
|
try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
|
||||||
% we crash in case of non-empty lists since we don't know what to do in that case
|
[] ->
|
||||||
|
{error, still_connecting};
|
||||||
[_ | _] = Results ->
|
[_ | _] = Results ->
|
||||||
case [E || {error, _} = E <- Results] of
|
case [E || {error, _} = E <- Results] of
|
||||||
[] ->
|
[] ->
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
database/1,
|
database/1,
|
||||||
username/1,
|
username/1,
|
||||||
password/1,
|
password/1,
|
||||||
|
password_required/1,
|
||||||
auto_reconnect/1
|
auto_reconnect/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -104,6 +105,14 @@ password(sensitive) -> true;
|
||||||
password(converter) -> fun emqx_schema:password_converter/2;
|
password(converter) -> fun emqx_schema:password_converter/2;
|
||||||
password(_) -> undefined.
|
password(_) -> undefined.
|
||||||
|
|
||||||
|
password_required(type) -> binary();
|
||||||
|
password_required(desc) -> ?DESC("password");
|
||||||
|
password_required(required) -> true;
|
||||||
|
password_required(format) -> <<"password">>;
|
||||||
|
password_required(sensitive) -> true;
|
||||||
|
password_required(converter) -> fun emqx_schema:password_converter/2;
|
||||||
|
password_required(_) -> undefined.
|
||||||
|
|
||||||
auto_reconnect(type) -> boolean();
|
auto_reconnect(type) -> boolean();
|
||||||
auto_reconnect(desc) -> ?DESC("auto_reconnect");
|
auto_reconnect(desc) -> ?DESC("auto_reconnect");
|
||||||
auto_reconnect(default) -> true;
|
auto_reconnect(default) -> true;
|
||||||
|
|
|
@ -192,7 +192,9 @@ ranch_opts(Options) ->
|
||||||
RanchOpts#{socket_opts => InetOpts ++ SocketOpts}.
|
RanchOpts#{socket_opts => InetOpts ++ SocketOpts}.
|
||||||
|
|
||||||
proto_opts(#{proxy_header := ProxyHeader}) ->
|
proto_opts(#{proxy_header := ProxyHeader}) ->
|
||||||
#{proxy_header => ProxyHeader}.
|
#{proxy_header => ProxyHeader};
|
||||||
|
proto_opts(_Opts) ->
|
||||||
|
#{}.
|
||||||
|
|
||||||
filter_false(_K, false, S) -> S;
|
filter_false(_K, false, S) -> S;
|
||||||
filter_false(K, V, S) -> [{K, V} | S].
|
filter_false(K, V, S) -> [{K, V} | S].
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_eviction_agent, [
|
{application, emqx_eviction_agent, [
|
||||||
{description, "EMQX Eviction Agent"},
|
{description, "EMQX Eviction Agent"},
|
||||||
{vsn, "5.0.0"},
|
{vsn, "5.0.1"},
|
||||||
{registered, [
|
{registered, [
|
||||||
emqx_eviction_agent_sup,
|
emqx_eviction_agent_sup,
|
||||||
emqx_eviction_agent,
|
emqx_eviction_agent,
|
||||||
|
|
|
@ -218,10 +218,10 @@ cancel_expiry_timer(_) ->
|
||||||
|
|
||||||
set_expiry_timer(#{conninfo := ConnInfo} = Channel) ->
|
set_expiry_timer(#{conninfo := ConnInfo} = Channel) ->
|
||||||
case maps:get(expiry_interval, ConnInfo) of
|
case maps:get(expiry_interval, ConnInfo) of
|
||||||
?UINT_MAX ->
|
?EXPIRE_INTERVAL_INFINITE ->
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
I when I > 0 ->
|
I when I > 0 ->
|
||||||
Timer = erlang:send_after(timer:seconds(I), self(), expire_session),
|
Timer = erlang:send_after(I, self(), expire_session),
|
||||||
{ok, Channel#{expiry_timer => Timer}};
|
{ok, Channel#{expiry_timer => Timer}};
|
||||||
_ ->
|
_ ->
|
||||||
{error, should_be_expired}
|
{error, should_be_expired}
|
||||||
|
|
|
@ -177,7 +177,7 @@ t_explicit_session_takeover(Config) ->
|
||||||
?assert(false, "Connection not evicted")
|
?assert(false, "Connection not evicted")
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
#{?snk_kind := emqx_cm_connected_client_count_dec, chan_pid := ChanPid},
|
#{?snk_kind := emqx_cm_connected_client_count_dec_done, chan_pid := ChanPid},
|
||||||
2000
|
2000
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -383,7 +383,7 @@ t_ws_conn(_Config) ->
|
||||||
|
|
||||||
?assertWaitEvent(
|
?assertWaitEvent(
|
||||||
ok = emqx_eviction_agent:evict_connections(1),
|
ok = emqx_eviction_agent:evict_connections(1),
|
||||||
#{?snk_kind := emqx_cm_connected_client_count_dec},
|
#{?snk_kind := emqx_cm_connected_client_count_dec_done},
|
||||||
1000
|
1000
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -418,7 +418,7 @@ t_quic_conn(_Config) ->
|
||||||
|
|
||||||
?assertWaitEvent(
|
?assertWaitEvent(
|
||||||
ok = emqx_eviction_agent:evict_connections(1),
|
ok = emqx_eviction_agent:evict_connections(1),
|
||||||
#{?snk_kind := emqx_cm_connected_client_count_dec},
|
#{?snk_kind := emqx_cm_connected_client_count_dec_done},
|
||||||
1000
|
1000
|
||||||
),
|
),
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_channel.hrl").
|
||||||
|
|
||||||
-define(CLIENT_ID, <<"client_with_session">>).
|
-define(CLIENT_ID, <<"client_with_session">>).
|
||||||
|
|
||||||
|
@ -101,7 +102,7 @@ t_start_infinite_expire(_Config) ->
|
||||||
conninfo => #{
|
conninfo => #{
|
||||||
clientid => ?CLIENT_ID,
|
clientid => ?CLIENT_ID,
|
||||||
receive_maximum => 32,
|
receive_maximum => 32,
|
||||||
expiry_interval => ?UINT_MAX
|
expiry_interval => ?EXPIRE_INTERVAL_INFINITE
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
|
|
|
@ -139,9 +139,9 @@ lookup(#{topic := Topic}) ->
|
||||||
%%%==============================================================================================
|
%%%==============================================================================================
|
||||||
%% internal
|
%% internal
|
||||||
generate_topic(Params = #{<<"topic">> := Topic}) ->
|
generate_topic(Params = #{<<"topic">> := Topic}) ->
|
||||||
Params#{<<"topic">> => uri_string:percent_decode(Topic)};
|
Params#{<<"topic">> => Topic};
|
||||||
generate_topic(Params = #{topic := Topic}) ->
|
generate_topic(Params = #{topic := Topic}) ->
|
||||||
Params#{topic => uri_string:percent_decode(Topic)};
|
Params#{topic => Topic};
|
||||||
generate_topic(Params) ->
|
generate_topic(Params) ->
|
||||||
Params.
|
Params.
|
||||||
|
|
||||||
|
|
|
@ -92,4 +92,35 @@ t_nodes_api(Config) ->
|
||||||
#{<<"topic">> := Topic, <<"node">> := Node2}
|
#{<<"topic">> := Topic, <<"node">> := Node2}
|
||||||
] = emqx_utils_json:decode(RouteResponse, [return_maps]),
|
] = emqx_utils_json:decode(RouteResponse, [return_maps]),
|
||||||
|
|
||||||
?assertEqual(lists:usort([Node, atom_to_binary(Slave)]), lists:usort([Node1, Node2])).
|
?assertEqual(lists:usort([Node, atom_to_binary(Slave)]), lists:usort([Node1, Node2])),
|
||||||
|
|
||||||
|
ok = emqtt:stop(Client).
|
||||||
|
|
||||||
|
t_percent_topics(_Config) ->
|
||||||
|
Node = atom_to_binary(node(), utf8),
|
||||||
|
Topic = <<"test_%%1">>,
|
||||||
|
{ok, Client} = emqtt:start_link(#{
|
||||||
|
username => <<"routes_username">>, clientid => <<"routes_cid">>
|
||||||
|
}),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, Topic),
|
||||||
|
|
||||||
|
%% exact match with percent encoded topic
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["topics"]),
|
||||||
|
QS = uri_string:compose_query([
|
||||||
|
{"topic", Topic},
|
||||||
|
{"node", atom_to_list(node())}
|
||||||
|
]),
|
||||||
|
Headers = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
{ok, MatchResponse} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers),
|
||||||
|
MatchData = emqx_utils_json:decode(MatchResponse, [return_maps]),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"count">> := 1, <<"page">> := 1, <<"limit">> := 100},
|
||||||
|
maps:get(<<"meta">>, MatchData)
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
[#{<<"topic">> := Topic, <<"node">> := Node}],
|
||||||
|
maps:get(<<"data">>, MatchData)
|
||||||
|
),
|
||||||
|
|
||||||
|
ok = emqtt:stop(Client).
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Avoid duplicated percent decode the topic name in API `/topics/{topic}` and `/topics`.
|
|
@ -67,6 +67,11 @@ emqx_bridge_pulsar {
|
||||||
label = "Enable or Disable"
|
label = "Enable or Disable"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
connect_timeout {
|
||||||
|
desc = "Maximum wait time for TCP connection establishment (including authentication time if enabled)."
|
||||||
|
label = "Connect Timeout"
|
||||||
|
}
|
||||||
|
|
||||||
desc_name {
|
desc_name {
|
||||||
desc = "Bridge name, used as a human-readable description of the bridge."
|
desc = "Bridge name, used as a human-readable description of the bridge."
|
||||||
label = "Bridge Name"
|
label = "Bridge Name"
|
||||||
|
|
|
@ -213,9 +213,6 @@ pending connections can grow to."""
|
||||||
fields_tcp_opts_backlog.label:
|
fields_tcp_opts_backlog.label:
|
||||||
"""TCP backlog length"""
|
"""TCP backlog length"""
|
||||||
|
|
||||||
broker_route_batch_clean.desc:
|
|
||||||
"""Enable batch clean for deleted routes."""
|
|
||||||
|
|
||||||
fields_mqtt_quic_listener_initial_window_packets.desc:
|
fields_mqtt_quic_listener_initial_window_packets.desc:
|
||||||
"""The size (in packets) of the initial congestion window for a connection. Default: 10"""
|
"""The size (in packets) of the initial congestion window for a connection. Default: 10"""
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,11 @@ emqx_bridge_pulsar {
|
||||||
label = "启用或停用"
|
label = "启用或停用"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
connect_timeout {
|
||||||
|
desc = "建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。"
|
||||||
|
label = "连接超时时间"
|
||||||
|
}
|
||||||
|
|
||||||
servers {
|
servers {
|
||||||
desc = "以逗号分隔的 <code>scheme://host[:port]</code> 格式的 Pulsar URL 列表,"
|
desc = "以逗号分隔的 <code>scheme://host[:port]</code> 格式的 Pulsar URL 列表,"
|
||||||
"支持的 scheme 有 <code>pulsar://</code> (默认)"
|
"支持的 scheme 有 <code>pulsar://</code> (默认)"
|
||||||
|
|
|
@ -208,9 +208,6 @@ fields_tcp_opts_backlog.desc:
|
||||||
fields_tcp_opts_backlog.label:
|
fields_tcp_opts_backlog.label:
|
||||||
"""TCP 连接队列长度"""
|
"""TCP 连接队列长度"""
|
||||||
|
|
||||||
broker_route_batch_clean.desc:
|
|
||||||
"""是否开启批量清除路由。"""
|
|
||||||
|
|
||||||
fields_mqtt_quic_listener_initial_window_packets.desc:
|
fields_mqtt_quic_listener_initial_window_packets.desc:
|
||||||
"""一个连接的初始拥堵窗口的大小(以包为单位)。默认值:10"""
|
"""一个连接的初始拥堵窗口的大小(以包为单位)。默认值:10"""
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue