Merge pull request #8328 from emqx/copy-of-main-v4.3

merge main-v4.3 into main-v4.4
This commit is contained in:
Xinyu Liu 2022-06-27 21:36:59 +08:00 committed by GitHub
commit ae294cccb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 122 additions and 34 deletions

View File

@ -19,6 +19,7 @@ File format:
management HTTPS listeners. [#8129]
- Add message republish supports using placeholder variables to specify QoS and Retain values. Set `${qos}` and `${flags.retain}` use the original QoS & Retain flag.
- Add supports specifying the network interface address of the cluster listener & rcp call listener. Specify `0.0.0.0` use all network interfaces, or a particular network interface IP address.
- ExHook supports to customize the socket parameters for gRPC client. [#8314]
### Bug fixes
@ -29,6 +30,8 @@ File format:
information to fill them. [#8280]
- Fixed issue in Lua hook that didn't prevent a topic from being
subscribed to. [#8288]
- Ensuring that exhook dispatches the client events are sequential. [#8311]
- Ensure start dashboard ok event if default_username is missing.
## v4.3.15

View File

@ -53,3 +53,20 @@ exhook.server.default.url = http://127.0.0.1:9000
#exhook.server.default.ssl.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
#exhook.server.default.ssl.certfile = {{ platform_etc_dir }}/certs/cert.pem
#exhook.server.default.ssl.keyfile = {{ platform_etc_dir }}/certs/key.pem
## Enables/disables periodic transmission on a connected socket when no other data is exchanged.
## If the other end does not respond, the connection is considered broken and an error message is sent to the controlling process.
##
## Default: true
#exhook.server.default.socket_options.keepalive = true
## If true, option TCP_NODELAY is turned on for the socket, which means that also small amounts of data are sent immediately.
##
## Default: true
#exhook.server.default.socket_options.nodelay = true
## The minimum size of the receive buffer to use for the socket.
#exhook.server.default.socket_options.recbuf = 64KB
## The minimum size of the send buffer to use for the socket.
#exhook.server.default.socket_options.sndbuf = 16KB

View File

@ -51,12 +51,42 @@ end}.
{datatype, string}
]}.
{mapping, "exhook.server.$name.socket_options.keepalive", "emqx_exhook.servers", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "exhook.server.$name.socket_options.nodelay", "emqx_exhook.servers", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "exhook.server.$name.socket_options.recbuf", "emqx_exhook.servers", [
{datatype, bytesize}
]}.
{mapping, "exhook.server.$name.socket_options.sndbuf", "emqx_exhook.servers", [
{datatype, bytesize}
]}.
{translation, "emqx_exhook.servers", fun(Conf) ->
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
MkSockOpts = fun(Prefix, Conf) ->
{socket_options,
Filter([{Opt,
cuttlefish:conf_get(Prefix ++ ".socket_options." ++ atom_to_list(Opt),
Conf,
undefined)}
|| Opt <- [keepalive, nodelay, recbuf, sndbuf]])}
end,
ServerOptions = fun(Prefix) ->
case http_uri:parse(cuttlefish:conf_get(Prefix ++ ".url", Conf)) of
{ok, {http, _, Host, Port, _, _}} ->
[{scheme, http}, {host, Host}, {port, Port}];
[{scheme, http}, {host, Host}, {port, Port},
MkSockOpts(Prefix, Conf)
];
{ok, {https, _, Host, Port, _, _}} ->
[{scheme, https}, {host, Host}, {port, Port},
{ssl_options,
@ -64,10 +94,13 @@ end}.
{certfile, cuttlefish:conf_get(Prefix ++ ".ssl.certfile", Conf, undefined)},
{keyfile, cuttlefish:conf_get(Prefix ++ ".ssl.keyfile", Conf, undefined)},
{cacertfile, cuttlefish:conf_get(Prefix ++ ".ssl.cacertfile", Conf, undefined)}
])}];
])},
MkSockOpts(Prefix, Conf)
];
_ -> error(invalid_server_options)
end
end,
[{list_to_atom(Name), ServerOptions("exhook.server." ++ Name)}
|| {["exhook", "server", Name, "url"], _} <- cuttlefish_variable:filter_by_prefix("exhook.server", Conf)]
end}.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_exhook,
[{description, "EMQ X Extension for Hook"},
{vsn, "4.4.1"},
{vsn, "4.4.2"},
{modules, []},
{registered, []},
{mod, {emqx_exhook_app, []}},

View File

@ -1,7 +1,9 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.0",
[{"4.4.1",
[{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}]},
{"4.4.0",
[{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]},
{load_module,emqx_exhook,brutal_purge,soft_purge,[]},
{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]},
@ -9,7 +11,9 @@
{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]},
{update, emqx_exhook_mngr, {advanced, ["4.4.0"]}}]},
{<<".*">>,[]}],
[{"4.4.0",
[{"4.4.1",
[{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}]},
{"4.4.0",
[{load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]},
{load_module,emqx_exhook,brutal_purge,soft_purge,[]},
{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]},

View File

@ -126,6 +126,7 @@ channel_opts(Opts) ->
Host = proplists:get_value(host, Opts),
Port = proplists:get_value(port, Opts),
SvrAddr = format_http_uri(Scheme, Host, Port),
SockOpts = proplists:get_value(socket_options, Opts),
ClientOpts = case Scheme of
https ->
SslOpts = lists:keydelete(
@ -135,8 +136,10 @@ channel_opts(Opts) ->
),
#{gun_opts =>
#{transport => ssl,
transport_opts => SslOpts}};
_ -> #{}
transport_opts => SockOpts ++ SslOpts}};
_ ->
#{gun_opts =>
#{transport_opts => SockOpts}}
end,
NClientOpts = ClientOpts#{pool_size => emqx_exhook_mngr:get_pool_size()},
{SvrAddr, NClientOpts}.
@ -277,7 +280,7 @@ match_topic_filter(TopicName, TopicFilter) ->
-spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
do_call(ChannName, Fun, Req, ReqOpts) ->
NReq = Req#{meta => emqx_exhook:request_meta()},
Options = ReqOpts#{channel => ChannName},
Options = ReqOpts#{channel => ChannName, key_dispatch => key_dispatch(NReq)},
?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, NReq, Options]),
case catch apply(?PB_CLIENT_MOD, Fun, [NReq, Options]) of
{ok, Resp, Metadata} ->
@ -335,3 +338,13 @@ available_hooks() ->
'session.created', 'session.subscribed', 'session.unsubscribed',
'session.resumed', 'session.discarded', 'session.takeovered',
'session.terminated' | message_hooks()].
%% @doc Get dispatch_key for each request
key_dispatch(_Req = #{clientinfo := #{clientid := ClientId}}) ->
ClientId;
key_dispatch(_Req = #{conninfo := #{clientid := ClientId}}) ->
ClientId;
key_dispatch(_Req = #{message := #{from := From}}) ->
From;
key_dispatch(_Req) ->
self().

View File

@ -46,7 +46,8 @@ set_special_cfgs(emqx) ->
application:set_env(emqx, allow_anonymous, false),
application:set_env(emqx, enable_acl_cache, false),
application:set_env(emqx, plugins_loaded_file, undefined),
application:set_env(emqx, modules_loaded_file, undefined);
application:set_env(emqx, modules_loaded_file, undefined),
application:set_env(ekka, cluster_name, ?OTHER_CLUSTER_NAME_ATOM);
set_special_cfgs(emqx_exhook) ->
ok.

View File

@ -140,13 +140,13 @@ list_nodes() ->
lookup_node(Node) -> node_info(Node).
node_info(Node) when Node =:= node() ->
Memory = emqx_vm:get_memory(),
{UsedRatio, Total} = get_sys_memory(),
Info = maps:from_list([{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()]),
BrokerInfo = emqx_sys:info(),
Info#{node => node(),
otp_release => iolist_to_binary(otp_rel()),
memory_total => proplists:get_value(allocated, Memory),
memory_used => proplists:get_value(used, Memory),
memory_total => Total,
memory_used => erlang:round(Total * UsedRatio),
process_available => erlang:system_info(process_limit),
process_used => erlang:system_info(process_count),
max_fds => proplists:get_value(max_fds,
@ -159,6 +159,14 @@ node_info(Node) when Node =:= node() ->
node_info(Node) ->
rpc_call(Node, node_info, [Node]).
get_sys_memory() ->
case os:type() of
{unix, linux} ->
load_ctl:get_sys_memory();
_ ->
{0, 0}
end.
stopped_node_info(Node) ->
#{name => Node, node_status => 'Stopped'}.

View File

@ -175,7 +175,8 @@ on_action_republish(_Selected, Envs = #{
}) ->
?LOG(error, "[republish] recursively republish detected, msg topic: ~p, target topic: ~p",
[Topic, ?bound_v('TargetTopic', Envs)]),
emqx_rule_metrics:inc_actions_error(?bound_v('Id', Envs));
emqx_rule_metrics:inc_actions_error(?bound_v('Id', Envs)),
{badact, recursively_republish};
on_action_republish(Selected, _Envs = #{
qos := QoS, flags := Flags, timestamp := Timestamp,

View File

@ -268,13 +268,16 @@ on_action_data_to_webserver(Selected, _Envs =
emqx_rule_metrics:inc_actions_success(Id);
{ok, StatusCode, _} ->
?LOG(warning, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]),
emqx_rule_metrics:inc_actions_error(Id);
emqx_rule_metrics:inc_actions_error(Id),
{badact, StatusCode};
{ok, StatusCode, _, _} ->
?LOG(warning, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]),
emqx_rule_metrics:inc_actions_error(Id);
emqx_rule_metrics:inc_actions_error(Id),
{badact, StatusCode};
{error, Reason} ->
?LOG(error, "HTTP request failed path: ~p error: ~p", [NPath, Reason]),
emqx_rule_metrics:inc_actions_error(Id)
emqx_rule_metrics:inc_actions_error(Id),
{badact, Reason}
end.
format_msg([], Data) ->

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.4.5-beta.2"}).
-define(EMQX_RELEASE, {opensource, "4.4.5-beta.3"}).
-else.

View File

@ -22,6 +22,7 @@
-include("emqx_dashboard.hrl").
-include_lib("emqx/include/logger.hrl").
-define(DEFAULT_PASSWORD, <<"public">>).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
@ -206,11 +207,15 @@ is_valid_pwd(<<Salt:4/binary, Hash/binary>>, Password) ->
%%--------------------------------------------------------------------
init([]) ->
%% Add default admin user
{ok, _} = mnesia:subscribe({table, mqtt_admin, simple}),
PasswordHash = ensure_default_user_in_db(binenv(default_user_username)),
ok = ensure_default_user_passwd_hashed_in_pt(PasswordHash),
ok = maybe_warn_default_pwd(),
case binenv(default_user_username) of
<<>> -> ok;
UserName ->
%% Add default admin user
{ok, _} = mnesia:subscribe({table, mqtt_admin, simple}),
PasswordHash = ensure_default_user_in_db(UserName),
ok = ensure_default_user_passwd_hashed_in_pt(PasswordHash),
ok = maybe_warn_default_pwd()
end,
{ok, state}.
handle_call(_Req, _From, State) ->
@ -220,7 +225,7 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({mnesia_table_event, {write, Admin, _}}, State) ->
%% the password is chagned from another node, sync it to persistent_term
%% the password is changed from another node, sync it to persistent_term
#mqtt_admin{username = Username, password = HashedPassword} = Admin,
case binenv(default_user_username) of
Username ->
@ -258,6 +263,7 @@ salt() ->
binenv(Key) ->
iolist_to_binary(application:get_env(emqx_dashboard, Key, <<>>)).
ensure_default_user_in_db(<<>>) -> <<>>;
ensure_default_user_in_db(Username) ->
F =
fun() ->
@ -279,12 +285,9 @@ ensure_default_user_in_db(Username) ->
initial_default_user_passwd_hashed() ->
case get_default_user_passwd_hashed_from_pt() of
Empty when ?EMPTY_KEY(Empty) ->
%% in case it's not set yet
case binenv(default_user_passwd) of
Empty when ?EMPTY_KEY(Empty) ->
error({missing_configuration, default_user_passwd});
Pwd ->
hash(Pwd)
Empty when ?EMPTY_KEY(Empty) -> hash(?DEFAULT_PASSWORD);
Pwd -> hash(Pwd)
end;
PwdHash ->
PwdHash
@ -300,7 +303,7 @@ get_default_user_passwd_hashed_from_pt() ->
persistent_term:get({?MODULE, default_user_passwd_hashed}, <<>>).
maybe_warn_default_pwd() ->
case is_valid_pwd(initial_default_user_passwd_hashed(), <<"public">>) of
case is_valid_pwd(initial_default_user_passwd_hashed(), ?DEFAULT_PASSWORD) of
true ->
?LOG(warning,
"[Dashboard] Using default password for dashboard 'admin' user. "

View File

@ -45,9 +45,9 @@
load(RawRules) ->
{PubRules, SubRules} = compile(RawRules),
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}).
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, 1000),
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, 1000),
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, 1000).
rewrite_subscribe(ClientInfo, _Properties, TopicFilters, Rules) ->
Binds = fill_client_binds(ClientInfo),

View File

@ -39,12 +39,13 @@
{deps,
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {redbug, "2.0.7"}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}}
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.2"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.5"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.6"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.10"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
@ -61,7 +62,7 @@
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}}
, {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.5"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}}
]}.
{xref_ignores,

View File

@ -250,6 +250,7 @@ relx_apps(ReleaseType) ->
, inets
, compiler
, runtime_tools
, redbug
, cuttlefish
, emqx
, {mnesia, load}