Merge remote-tracking branch 'origin/main-v4.4' into chore/rename-packages-name

This commit is contained in:
Zaiming Shi 2021-11-16 09:47:13 +01:00
commit 88efc6612d
24 changed files with 275 additions and 75 deletions

View File

@ -7,6 +7,12 @@
## Value: single | unknown | sharded | rs
auth.mongo.type = single
## Whether to use SRV and TXT records.
##
## Value: true | false
## Default: false
auth.mongo.srv_record = false
## The set name if type is rs.
##
## Value: String
@ -37,7 +43,6 @@ auth.mongo.pool = 8
## MongoDB AuthSource
##
## Value: String
## Default: mqtt
## auth.mongo.auth_source = admin
## MongoDB database

View File

@ -6,8 +6,12 @@
{datatype, {enum, [single, unknown, sharded, rs]}}
]}.
{mapping, "auth.mongo.srv_record", "emqx_auth_mongo.server", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{mapping, "auth.mongo.rs_set_name", "emqx_auth_mongo.server", [
{default, "mqtt"},
{datatype, string}
]}.
@ -41,7 +45,6 @@
]}.
{mapping, "auth.mongo.auth_source", "emqx_auth_mongo.server", [
{default, "mqtt"},
{datatype, string}
]}.
@ -101,9 +104,9 @@
]}.
{translation, "emqx_auth_mongo.server", fun(Conf) ->
H = cuttlefish:conf_get("auth.mongo.server", Conf),
Hosts = string:tokens(H, ","),
Type0 = cuttlefish:conf_get("auth.mongo.type", Conf),
SrvRecord = cuttlefish:conf_get("auth.mongo.srv_record", Conf, false),
Server = cuttlefish:conf_get("auth.mongo.server", Conf),
Type = cuttlefish:conf_get("auth.mongo.type", Conf),
Pool = cuttlefish:conf_get("auth.mongo.pool", Conf),
%% FIXME: compatible with 4.0-4.2 version format, plan to delete in 5.0
Login = cuttlefish:conf_get("auth.mongo.username", Conf,
@ -111,7 +114,10 @@
),
Passwd = cuttlefish:conf_get("auth.mongo.password", Conf),
DB = cuttlefish:conf_get("auth.mongo.database", Conf),
AuthSrc = cuttlefish:conf_get("auth.mongo.auth_source", Conf),
AuthSource = case cuttlefish:conf_get("auth.mongo.auth_source", Conf, undefined) of
undefined -> [];
AuthSource0 -> [{auth_source, list_to_binary(AuthSource0)}]
end,
R = cuttlefish:conf_get("auth.mongo.w_mode", Conf),
W = cuttlefish:conf_get("auth.mongo.r_mode", Conf),
Login0 = case Login =:= [] of
@ -156,8 +162,8 @@
false -> []
end,
WorkerOptions = [{database, list_to_binary(DB)}, {auth_source, list_to_binary(AuthSrc)}]
++ Login0 ++ Passwd0 ++ W0 ++ R0 ++ Ssl,
WorkerOptions = [{database, list_to_binary(DB)}]
++ Login0 ++ Passwd0 ++ W0 ++ R0 ++ Ssl ++ AuthSource,
Vars = cuttlefish_variable:fuzzy_matches(["auth", "mongo", "topology", "$name"], Conf),
Options = lists:map(fun({_, Name}) ->
@ -174,16 +180,17 @@
{list_to_atom(Name2), cuttlefish:conf_get("auth.mongo.topology."++Name, Conf)}
end, Vars),
Type = case Type0 =:= rs of
true -> {Type0, list_to_binary(cuttlefish:conf_get("auth.mongo.rs_set_name", Conf))};
false -> Type0
end,
[{type, Type},
{hosts, Hosts},
ReplicaSet = case cuttlefish:conf_get("auth.mongo.rs_set_name", Conf, undefined) of
undefined -> [];
ReplicaSet0 -> [{rs_set_name, list_to_binary(ReplicaSet0)}]
end,
[{srv_record, SrvRecord},
{type, Type},
{server, Server},
{options, Options},
{worker_options, WorkerOptions},
{auto_reconnect, 1},
{pool_size, Pool}]
{pool_size, Pool}] ++ ReplicaSet
end}.
%% The mongodb operation timeout is specified by the value of `cursor_timeout` from application config,

View File

@ -1,6 +1,6 @@
{application, emqx_auth_mongo,
[{description, "EMQ X Authentication/ACL with MongoDB"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "4.4.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_auth_mongo_sup]},
{applications, [kernel,stdlib,mongodb,ecpool]},

View File

@ -28,7 +28,96 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, PoolEnv} = application:get_env(?APP, server),
PoolSpec = ecpool:pool_spec(?APP, ?APP, ?APP, PoolEnv),
{ok, Opts} = application:get_env(?APP, server),
NOpts = may_parse_srv_and_txt_records(Opts),
PoolSpec = ecpool:pool_spec(?APP, ?APP, ?APP, NOpts),
{ok, {{one_for_all, 10, 100}, [PoolSpec]}}.
may_parse_srv_and_txt_records(Opts) when is_list(Opts) ->
maps:to_list(may_parse_srv_and_txt_records(maps:from_list(Opts)));
may_parse_srv_and_txt_records(#{type := Type,
srv_record := false,
server := Server} = Opts) ->
Hosts = to_hosts(Server),
case Type =:= rs of
true ->
case maps:get(rs_set_name, Opts, undefined) of
undefined ->
error({missing_parameter, rs_set_name});
ReplicaSet ->
Opts#{type => {rs, ReplicaSet},
hosts => Hosts}
end;
false ->
Opts#{hosts => Hosts}
end;
may_parse_srv_and_txt_records(#{type := Type,
srv_record := true,
server := Server,
worker_options := WorkerOptions} = Opts) ->
Hosts = parse_srv_records(Server),
Opts0 = parse_txt_records(Type, Server),
NWorkerOptions = maps:to_list(maps:merge(maps:from_list(WorkerOptions), maps:with([auth_source], Opts0))),
NOpts = Opts#{hosts => Hosts, worker_options => NWorkerOptions},
case Type =:= rs of
true ->
case maps:get(rs_set_name, Opts0, maps:get(rs_set_name, NOpts, undefined)) of
undefined ->
error({missing_parameter, rs_set_name});
ReplicaSet ->
NOpts#{type => {Type, ReplicaSet}}
end;
false ->
NOpts
end.
to_hosts(Server) ->
[string:trim(H) || H <- string:tokens(Server, ",")].
parse_srv_records(Server) ->
case inet_res:lookup("_mongodb._tcp." ++ Server, in, srv) of
[] ->
error(service_not_found);
Services ->
[Host ++ ":" ++ integer_to_list(Port) || {_, _, Port, Host} <- Services]
end.
parse_txt_records(Type, Server) ->
case inet_res:lookup(Server, in, txt) of
[] ->
#{};
[[QueryString]] ->
case uri_string:dissect_query(QueryString) of
{error, _, _} ->
error({invalid_txt_record, invalid_query_string});
Options ->
Fields = case Type of
rs -> ["authSource", "replicaSet"];
_ -> ["authSource"]
end,
take_and_convert(Fields, Options)
end;
_ ->
error({invalid_txt_record, multiple_records})
end.
take_and_convert(Fields, Options) ->
take_and_convert(Fields, Options, #{}).
take_and_convert([], [_ | _], _Acc) ->
error({invalid_txt_record, invalid_option});
take_and_convert([], [], Acc) ->
Acc;
take_and_convert([Field | More], Options, Acc) ->
case lists:keytake(Field, 1, Options) of
{value, {"authSource", V}, NOptions} ->
take_and_convert(More, NOptions, Acc#{auth_source => list_to_binary(V)});
{value, {"replicaSet", V}, NOptions} ->
take_and_convert(More, NOptions, Acc#{rs_set_name => list_to_binary(V)});
{value, _, _} ->
error({invalid_txt_record, invalid_option});
false ->
take_and_convert(More, Options, Acc)
end.

View File

@ -24,7 +24,7 @@
-logger_header("[SLOW TOPICS]").
-export([ start_link/1, on_publish_done/5, enable/0
-export([ start_link/1, on_publish_done/3, enable/0
, disable/0, clear_history/0
]).
@ -42,7 +42,7 @@
-type state() :: #{ config := proplist:proplist()
, period := pos_integer()
, last_tick_at := pos_integer()
, counter := counters:counter_ref()
, counter := counters:counters_ref()
, enable := boolean()
}.
@ -70,6 +70,13 @@
-type slow_log() :: #slow_log{}.
-type top_k_map() :: #{emqx_types:topic() => top_k()}.
-type publish_done_env() :: #{ ignore_before_create := boolean()
, threshold := pos_integer()
, counter := counters:counters_ref()
}.
-type publish_done_args() :: #{session_rebirth_time => pos_integer()}.
-ifdef(TEST).
-define(TOPK_ACCESS, public).
-else.
@ -90,13 +97,16 @@
start_link(Env) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
-spec on_publish_done(message(),
pos_integer(), boolean(), pos_integer(), counters:counters_ref()) -> ok.
on_publish_done(#message{timestamp = Timestamp}, Created, IgnoreBeforeCreate, _, _)
-spec on_publish_done(message(), publish_done_args(), publish_done_env()) -> ok.
on_publish_done(#message{timestamp = Timestamp},
#{session_rebirth_time := Created},
#{ignore_before_create := IgnoreBeforeCreate})
when IgnoreBeforeCreate, Timestamp < Created ->
ok;
on_publish_done(#message{timestamp = Timestamp} = Msg, _, _, Threshold, Counter) ->
on_publish_done(#message{timestamp = Timestamp} = Msg,
_,
#{threshold := Threshold, counter := Counter}) ->
case ?NOW - Timestamp of
Elapsed when Elapsed > Threshold ->
case get_log_quota(Counter) of
@ -202,7 +212,7 @@ init_topk_tab(_) ->
, {read_concurrency, true}
]).
-spec get_log_quota(counter:counter_ref()) -> boolean().
-spec get_log_quota(counters:counters_ref()) -> boolean().
get_log_quota(Counter) ->
case counters:get(Counter, ?QUOTA_IDX) of
Quota when Quota > 0 ->
@ -212,7 +222,7 @@ get_log_quota(Counter) ->
false
end.
-spec set_log_quota(proplists:proplist(), counter:counter_ref()) -> ok.
-spec set_log_quota(proplists:proplist(), counters:counters_ref()) -> ok.
set_log_quota(Cfg, Counter) ->
MaxLogNum = get_value(max_log_num, Cfg),
counters:put(Counter, ?QUOTA_IDX, MaxLogNum).
@ -328,12 +338,15 @@ publish(TickTime, Cfg, Notices) ->
load(IgnoreBeforeCreate, Threshold, Counter) ->
_ = emqx:hook('message.publish_done',
fun ?MODULE:on_publish_done/5,
[IgnoreBeforeCreate, Threshold, Counter]),
fun ?MODULE:on_publish_done/3,
[#{ignore_before_create => IgnoreBeforeCreate,
threshold => Threshold,
counter => Counter}
]),
ok.
unload() ->
emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/5).
emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3).
-spec get_topic(proplists:proplist()) -> binary().
get_topic(Cfg) ->

View File

@ -16,6 +16,7 @@
-module(emqx_trace_api).
-include_lib("emqx/include/logger.hrl").
-include_lib("kernel/include/file.hrl").
%% API
-export([ list_trace/2
@ -74,10 +75,10 @@ download_zip_log(#{name := Name}, _Param) ->
TraceFiles = collect_trace_file(TraceLog),
ZipDir = emqx_trace:zip_dir(),
Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
ZipFileName = ZipDir ++ TraceLog,
{ok, ZipFile} = zip:zip(ZipFileName, Zips),
ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip",
{ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]),
emqx_trace:delete_files_after_send(ZipFileName, Zips),
{ok, #{}, {sendfile, 0, filelib:file_size(ZipFile), ZipFile}};
{ok, ZipFile};
{error, Reason} ->
{error, Reason}
end.
@ -88,7 +89,7 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) ->
{ok, Node, Bin} ->
ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
ok = file:write_file(ZipName, Bin),
[ZipName | Acc];
[Node ++ "-" ++ TraceLog | Acc];
{error, Node, Reason} ->
?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]),
Acc
@ -101,20 +102,19 @@ collect_trace_file(TraceLog) ->
BadNodes =/= [] andalso ?LOG(error, "download log rpc failed on ~p", [BadNodes]),
Files.
%% _page as position and _limit as bytes for front-end reusing components
stream_log_file(#{name := Name}, Params) ->
Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())),
Position0 = proplists:get_value(<<"_page">>, Params, <<"0">>),
Bytes0 = proplists:get_value(<<"_limit">>, Params, <<"500">>),
Position0 = proplists:get_value(<<"position">>, Params, <<"0">>),
Bytes0 = proplists:get_value(<<"bytes">>, Params, <<"1000">>),
Node = binary_to_existing_atom(Node0),
Position = binary_to_integer(Position0),
Bytes = binary_to_integer(Bytes0),
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
{ok, Bin} ->
Meta = #{<<"page">> => Position + byte_size(Bin), <<"limit">> => Bytes},
Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
{ok, #{meta => Meta, items => Bin}};
eof ->
Meta = #{<<"page">> => Position, <<"limit">> => Bytes},
{eof, Size} ->
Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
{ok, #{meta => Meta, items => <<"">>}};
{error, Reason} ->
logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]),
@ -134,6 +134,7 @@ read_trace_file(Name, Position, Limit) ->
[] -> {error, not_found}
end.
-dialyzer({nowarn_function, read_file/3}).
read_file(Path, Offset, Bytes) ->
{ok, IoDevice} = file:open(Path, [read, raw, binary]),
try
@ -141,7 +142,13 @@ read_file(Path, Offset, Bytes) ->
0 -> ok;
_ -> file:position(IoDevice, {bof, Offset})
end,
file:read(IoDevice, Bytes)
case file:read(IoDevice, Bytes) of
{ok, Bin} -> {ok, Bin};
{error, Reason} -> {error, Reason};
eof ->
#file_info{size = Size} = file:read_file_info(IoDevice),
{eof, Size}
end
after
file:close(IoDevice)
end.

View File

@ -336,9 +336,8 @@ t_download_log(_Config) ->
{ok, _} = emqtt:connect(Client),
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
ct:sleep(100),
{ok, #{}, {sendfile, 0, ZipFileSize, _ZipFile}} =
emqx_trace_api:download_zip_log(#{name => Name}, []),
?assert(ZipFileSize > 0),
{ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []),
?assert(filelib:file_size(ZipFile) > 0),
ok = emqtt:disconnect(Client),
unload(),
ok.

View File

@ -1,6 +1,6 @@
{application, emqx_retainer,
[{description, "EMQ X Retainer"},
{vsn, "4.4.1"}, % strict semver, bump manually!
{vsn, "4.4.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel,stdlib]},

View File

@ -156,7 +156,7 @@ start_expire_timer(0, State) ->
start_expire_timer(undefined, State) ->
State;
start_expire_timer(Ms, State) ->
Timer = erlang:send_after(Ms, self(), stats),
Timer = erlang:send_after(Ms, self(), {expire, Ms}),
State#state{expiry_timer = Timer}.
handle_call(Req, _From, State) ->
@ -168,12 +168,14 @@ handle_cast(Msg, State) ->
{noreply, State}.
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
StatsTimer = erlang:send_after(timer:seconds(1), self(), stats),
StatsFun(retained_count()),
{noreply, State, hibernate};
{noreply, State#state{stats_timer = StatsTimer}, hibernate};
handle_info(expire, State) ->
handle_info({expire, Ms} = Expire, State) ->
Timer = erlang:send_after(Ms, self(), Expire),
ok = expire_messages(),
{noreply, State, hibernate};
{noreply, State#state{expiry_timer = Timer}, hibernate};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),

View File

@ -1,6 +1,6 @@
{application, emqx_rule_engine,
[{description, "EMQ X Rule Engine"},
{vsn, "4.3.6"}, % strict semver, bump manually!
{vsn, "4.4.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
{applications, [kernel,stdlib,rulesql,getopt]},

View File

@ -5,7 +5,8 @@
{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{restart_application,emqx_stomp}]},
[{restart_application,emqx_stomp},
{apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]},
{<<".*">>,[]}],
[{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]},
{"4.3.1",[

View File

@ -33,6 +33,8 @@
, stop_listener/3
]).
-export([force_clear_after_app_stoped/0]).
-export([init/1]).
-define(APP, ?MODULE).
@ -52,6 +54,18 @@ start(_StartType, _StartArgs) ->
stop(_State) ->
stop_listeners().
force_clear_after_app_stoped() ->
lists:foreach(fun({Name = {ProtoName, _}, _}) ->
case is_stomp_listener(ProtoName) of
true -> esockd:close(Name);
_ -> ok
end
end, esockd:listeners()).
is_stomp_listener('stomp:tcp') -> true;
is_stomp_listener('stomp:ssl') -> true;
is_stomp_listener(_) -> false.
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------

View File

@ -101,6 +101,11 @@ cluster.autoclean = 5m
## Value: String
## cluster.dns.app = emqx
## Type of dns record.
##
## Value: Value: a | srv
## cluster.dns.type = a
##--------------------------------------------------------------------
## Cluster using etcd

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.4.0"}).
-define(EMQX_RELEASE, {opensource, "4.4.0-alpha.1"}).
-else.

View File

@ -1,6 +1,6 @@
{application, emqx_dashboard,
[{description, "EMQ X Web Dashboard"},
{vsn, "4.3.6"}, % strict semver, bump manually!
{vsn, "4.3.7"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_dashboard_sup]},
{applications, [kernel,stdlib,mnesia,minirest]},

View File

@ -87,9 +87,12 @@ update_trace(Path, Params) ->
download_zip_log(Path, Params) ->
case emqx_trace_api:download_zip_log(Path, Params) of
{ok, _Header, _File}= Return -> Return;
{error, _Reason} = Err -> return(Err)
{ok, File} -> minirest:return_file(File);
{error, Reason} -> return({error, 'NOT_FOUND', Reason})
end.
stream_log_file(Path, Params) ->
return(emqx_trace_api:stream_log_file(Path, Params)).
case emqx_trace_api:stream_log_file(Path, Params) of
{ok, File} -> return({ok, File});
{error, Reason} -> return({error, 'NOT_FOUND', Reason})
end.

View File

@ -124,14 +124,14 @@ t_stream_log(_Config) ->
{ok, FileBin} = file:read_file(File),
ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]),
Header = auth_header_(),
{ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?_limit=10"), Header),
{ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?bytes=10"), Header),
#{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta, <<"items">> := Bin}} = json(Binary),
?assertEqual(10, byte_size(Bin)),
?assertEqual(#{<<"page">> => 10, <<"limit">> => 10}, Meta),
Path = api_path("trace/test_stream_log/log?_page=20&_limit=10"),
?assertEqual(#{<<"position">> => 10, <<"bytes">> => 10}, Meta),
Path = api_path("trace/test_stream_log/log?position=20&bytes=10"),
{ok, Binary1} = request_api(get, Path, Header),
#{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta1, <<"items">> := Bin1}} = json(Binary1),
?assertEqual(#{<<"page">> => 30, <<"limit">> => 10}, Meta1),
?assertEqual(#{<<"position">> => 30, <<"bytes">> => 10}, Meta1),
?assertEqual(10, byte_size(Bin1)),
unload(),
ok.

View File

@ -15,9 +15,21 @@ fi
## emqx_release.hrl is the single source of truth for release version
RELEASE="$(grep -E "define.+EMQX_RELEASE.+${EDITION}" include/emqx_release.hrl | cut -d '"' -f2)"
## git commit hash is added as suffix in case the git tag and release version is not an exact match
if [ -d .git ] && ! git describe --tags --match "[e|v]${RELEASE}" --exact >/dev/null 2>&1; then
git_exact_vsn() {
local tag
tag="$(git describe --tags --match "[e|v]*" --exact 2>/dev/null)"
echo "$tag" | sed 's/^[v|e]//g'
}
GIT_EXACT_VSN="$(git_exact_vsn)"
if [ "$GIT_EXACT_VSN" != '' ]; then
if [ "$GIT_EXACT_VSN" != "$RELEASE" ]; then
echo "ERROR: Tagged $GIT_EXACT_VSN, but $RELEASE in include/emqx_release.hrl" 1>&2
exit 1
fi
SUFFIX=''
else
SUFFIX="-$(git rev-parse HEAD | cut -b1-8)"
fi
echo "${RELEASE}${SUFFIX:-}"
echo "${RELEASE}${SUFFIX}"

View File

@ -96,6 +96,11 @@
{datatype, string}
]}.
{mapping, "cluster.dns.type", "ekka.cluster_discovery", [
{datatype, {enum, [a, srv]}},
{default, a}
]}.
%%--------------------------------------------------------------------
%% Cluster using etcd
@ -171,7 +176,8 @@
{loop, cuttlefish:conf_get("cluster.mcast.loop", Conf, true)}];
(dns) ->
[{name, cuttlefish:conf_get("cluster.dns.name", Conf)},
{app, cuttlefish:conf_get("cluster.dns.app", Conf)}];
{app, cuttlefish:conf_get("cluster.dns.app", Conf)},
{type, cuttlefish:conf_get("cluster.dns.type", Conf)}];
(etcd) ->
SslOpts = fun(Conf) ->
Options = cuttlefish_variable:filter_by_prefix("cluster.etcd.ssl", Conf),
@ -362,11 +368,35 @@ end}.
]}.
%% RPC server port.
{mapping, "rpc.driver", "gen_rpc.driver",
[ {default, tcp}
, {datatype, {enum, [tcp, ssl]}}
]}.
{mapping, "rpc.tcp_server_port", "gen_rpc.tcp_server_port", [
{default, 5369},
{datatype, integer}
]}.
%% RPC SSL server port.
{mapping, "rpc.enable_ssl", "gen_rpc.ssl_server_port", [
{default, 5369},
{datatype, integer}
]}.
%% RPC SSL certificates
{mapping, "rpc.certfile", "gen_rpc.certfile", [
{datatype, string}
]}.
{mapping, "rpc.keyfile", "gen_rpc.keyfile", [
{datatype, string}
]}.
{mapping, "rpc.cacertfile", "gen_rpc.cacertfile", [
{datatype, string}
]}.
%% Number of tcp connections when connecting to RPC server
{mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [
{default, 0},

View File

@ -37,14 +37,14 @@
{deps,
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.10"}}}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.12"}}}
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.2"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.6.0"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.0"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}

View File

@ -1,6 +1,8 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.10",[{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
[{"4.3.10",
[{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
@ -155,7 +157,9 @@
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.10",[{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
[{"4.3.10",
[{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},

View File

@ -42,10 +42,13 @@ start(_Type, _Args) ->
ekka:start(),
{ok, Sup} = emqx_sup:start_link(),
ok = start_autocluster(),
%% We need to make sure that emqx's listeners start before plugins
%% and modules. Since if the emqx-conf module/plugin is enabled, it will
%% try to start or update the listeners with the latest configuration
emqx_boot:is_enabled(listeners) andalso (ok = emqx_listeners:start()),
ok = emqx_plugins:init(),
_ = emqx_plugins:load(),
_ = start_ce_modules(),
emqx_boot:is_enabled(listeners) andalso (ok = emqx_listeners:start()),
register(emqx, self()),
ok = emqx_alarm_handler:load(),
print_vsn(),

View File

@ -320,7 +320,8 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
puback(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
emqx:run_hook('message.publish_done', [Msg, CreatedAt]),
emqx:run_hook('message.publish_done',
[Msg, #{session_rebirth_time => CreatedAt}]),
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
return_with(Msg, dequeue(Session#session{inflight = Inflight1}));
{value, {_Pubrel, _Ts}} ->
@ -346,7 +347,8 @@ pubrec(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
%% execute hook here, because message record will be replaced by pubrel
emqx:run_hook('message.publish_done', [Msg, CreatedAt]),
emqx:run_hook('message.publish_done',
[Msg, #{session_rebirth_time => CreatedAt}]),
Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight),
{ok, Msg, Session#session{inflight = Inflight1}};
{value, {pubrel, _Ts}} ->
@ -443,7 +445,8 @@ deliver([Msg | More], Acc, Session) ->
end.
deliver_msg(Msg = #message{qos = ?QOS_0}, Session) ->
emqx:run_hook('message.publish_done', [Msg, Session#session.created_at]),
emqx:run_hook('message.publish_done',
[Msg, #{session_rebirth_time => Session#session.created_at}]),
{ok, [{undefined, maybe_ack(Msg)}], Session};
deliver_msg(Msg = #message{qos = QoS}, Session =

View File

@ -183,7 +183,8 @@ t_open_session_race_condition(_) ->
exit(Winner, kill),
receive {'DOWN', _, process, Winner, _} -> ok end,
ignored = gen_server:call(emqx_cm, ignore, infinity), %% sync
ignored = gen_server:call(?CM, ignore, infinity), %% sync
ok = flush_emqx_pool(),
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
t_kick_session_discard_normal(_) ->
@ -260,6 +261,7 @@ test_kick_session(Action, Reason) ->
?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)),
?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R))
end,
ignored = gen_server:call(?CM, ignore, infinity), %% sync
ok = flush_emqx_pool(),
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
@ -271,10 +273,11 @@ test_kick_session(Action, Reason) ->
%% The number of tasks should be large enough to ensure all workers have
%% the chance to work on at least one of the tasks.
flush_emqx_pool() ->
Ref = make_ref(),
Self = self(),
L = lists:seq(1, 1000),
lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L),
lists:foreach(fun(I) -> receive {done, I} -> ok end end, L).
lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I, Ref} end, []) end, L),
lists:foreach(fun(I) -> receive {done, I, Ref} -> ok end end, L).
t_discard_session_race(_) ->
ClientId = rand_client_id(),