diff --git a/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf b/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf index 2a3d038f0..8baddae19 100644 --- a/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf +++ b/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf @@ -7,6 +7,12 @@ ## Value: single | unknown | sharded | rs auth.mongo.type = single +## Whether to use SRV and TXT records. +## +## Value: true | false +## Default: false +auth.mongo.srv_record = false + ## The set name if type is rs. ## ## Value: String @@ -37,7 +43,6 @@ auth.mongo.pool = 8 ## MongoDB AuthSource ## ## Value: String -## Default: mqtt ## auth.mongo.auth_source = admin ## MongoDB database diff --git a/apps/emqx_auth_mongo/priv/emqx_auth_mongo.schema b/apps/emqx_auth_mongo/priv/emqx_auth_mongo.schema index 8a2ff98b3..17a83c37c 100644 --- a/apps/emqx_auth_mongo/priv/emqx_auth_mongo.schema +++ b/apps/emqx_auth_mongo/priv/emqx_auth_mongo.schema @@ -6,8 +6,12 @@ {datatype, {enum, [single, unknown, sharded, rs]}} ]}. +{mapping, "auth.mongo.srv_record", "emqx_auth_mongo.server", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {mapping, "auth.mongo.rs_set_name", "emqx_auth_mongo.server", [ - {default, "mqtt"}, {datatype, string} ]}. @@ -41,7 +45,6 @@ ]}. {mapping, "auth.mongo.auth_source", "emqx_auth_mongo.server", [ - {default, "mqtt"}, {datatype, string} ]}. @@ -101,9 +104,9 @@ ]}. {translation, "emqx_auth_mongo.server", fun(Conf) -> - H = cuttlefish:conf_get("auth.mongo.server", Conf), - Hosts = string:tokens(H, ","), - Type0 = cuttlefish:conf_get("auth.mongo.type", Conf), + SrvRecord = cuttlefish:conf_get("auth.mongo.srv_record", Conf, false), + Server = cuttlefish:conf_get("auth.mongo.server", Conf), + Type = cuttlefish:conf_get("auth.mongo.type", Conf), Pool = cuttlefish:conf_get("auth.mongo.pool", Conf), %% FIXME: compatible with 4.0-4.2 version format, plan to delete in 5.0 Login = cuttlefish:conf_get("auth.mongo.username", Conf, @@ -111,7 +114,10 @@ ), Passwd = cuttlefish:conf_get("auth.mongo.password", Conf), DB = cuttlefish:conf_get("auth.mongo.database", Conf), - AuthSrc = cuttlefish:conf_get("auth.mongo.auth_source", Conf), + AuthSource = case cuttlefish:conf_get("auth.mongo.auth_source", Conf, undefined) of + undefined -> []; + AuthSource0 -> [{auth_source, list_to_binary(AuthSource0)}] + end, R = cuttlefish:conf_get("auth.mongo.w_mode", Conf), W = cuttlefish:conf_get("auth.mongo.r_mode", Conf), Login0 = case Login =:= [] of @@ -156,8 +162,8 @@ false -> [] end, - WorkerOptions = [{database, list_to_binary(DB)}, {auth_source, list_to_binary(AuthSrc)}] - ++ Login0 ++ Passwd0 ++ W0 ++ R0 ++ Ssl, + WorkerOptions = [{database, list_to_binary(DB)}] + ++ Login0 ++ Passwd0 ++ W0 ++ R0 ++ Ssl ++ AuthSource, Vars = cuttlefish_variable:fuzzy_matches(["auth", "mongo", "topology", "$name"], Conf), Options = lists:map(fun({_, Name}) -> @@ -174,16 +180,17 @@ {list_to_atom(Name2), cuttlefish:conf_get("auth.mongo.topology."++Name, Conf)} end, Vars), - Type = case Type0 =:= rs of - true -> {Type0, list_to_binary(cuttlefish:conf_get("auth.mongo.rs_set_name", Conf))}; - false -> Type0 - end, - [{type, Type}, - {hosts, Hosts}, + ReplicaSet = case cuttlefish:conf_get("auth.mongo.rs_set_name", Conf, undefined) of + undefined -> []; + ReplicaSet0 -> [{rs_set_name, list_to_binary(ReplicaSet0)}] + end, + [{srv_record, SrvRecord}, + {type, Type}, + {server, Server}, {options, Options}, {worker_options, WorkerOptions}, {auto_reconnect, 1}, - {pool_size, Pool}] + {pool_size, Pool}] ++ ReplicaSet end}. %% The mongodb operation timeout is specified by the value of `cursor_timeout` from application config, diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src b/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src index cc4e72ef3..ab0b4ff56 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_mongo, [{description, "EMQ X Authentication/ACL with MongoDB"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.4.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_mongo_sup]}, {applications, [kernel,stdlib,mongodb,ecpool]}, diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo_sup.erl b/apps/emqx_auth_mongo/src/emqx_auth_mongo_sup.erl index 3f27cb1dd..55263494a 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo_sup.erl +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo_sup.erl @@ -28,7 +28,96 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, PoolEnv} = application:get_env(?APP, server), - PoolSpec = ecpool:pool_spec(?APP, ?APP, ?APP, PoolEnv), + {ok, Opts} = application:get_env(?APP, server), + NOpts = may_parse_srv_and_txt_records(Opts), + PoolSpec = ecpool:pool_spec(?APP, ?APP, ?APP, NOpts), {ok, {{one_for_all, 10, 100}, [PoolSpec]}}. +may_parse_srv_and_txt_records(Opts) when is_list(Opts) -> + maps:to_list(may_parse_srv_and_txt_records(maps:from_list(Opts))); + +may_parse_srv_and_txt_records(#{type := Type, + srv_record := false, + server := Server} = Opts) -> + Hosts = to_hosts(Server), + case Type =:= rs of + true -> + case maps:get(rs_set_name, Opts, undefined) of + undefined -> + error({missing_parameter, rs_set_name}); + ReplicaSet -> + Opts#{type => {rs, ReplicaSet}, + hosts => Hosts} + end; + false -> + Opts#{hosts => Hosts} + end; + +may_parse_srv_and_txt_records(#{type := Type, + srv_record := true, + server := Server, + worker_options := WorkerOptions} = Opts) -> + Hosts = parse_srv_records(Server), + Opts0 = parse_txt_records(Type, Server), + NWorkerOptions = maps:to_list(maps:merge(maps:from_list(WorkerOptions), maps:with([auth_source], Opts0))), + NOpts = Opts#{hosts => Hosts, worker_options => NWorkerOptions}, + case Type =:= rs of + true -> + case maps:get(rs_set_name, Opts0, maps:get(rs_set_name, NOpts, undefined)) of + undefined -> + error({missing_parameter, rs_set_name}); + ReplicaSet -> + NOpts#{type => {Type, ReplicaSet}} + end; + false -> + NOpts + end. + +to_hosts(Server) -> + [string:trim(H) || H <- string:tokens(Server, ",")]. + +parse_srv_records(Server) -> + case inet_res:lookup("_mongodb._tcp." ++ Server, in, srv) of + [] -> + error(service_not_found); + Services -> + [Host ++ ":" ++ integer_to_list(Port) || {_, _, Port, Host} <- Services] + end. + +parse_txt_records(Type, Server) -> + case inet_res:lookup(Server, in, txt) of + [] -> + #{}; + [[QueryString]] -> + case uri_string:dissect_query(QueryString) of + {error, _, _} -> + error({invalid_txt_record, invalid_query_string}); + Options -> + Fields = case Type of + rs -> ["authSource", "replicaSet"]; + _ -> ["authSource"] + end, + take_and_convert(Fields, Options) + end; + _ -> + error({invalid_txt_record, multiple_records}) + end. + +take_and_convert(Fields, Options) -> + take_and_convert(Fields, Options, #{}). + +take_and_convert([], [_ | _], _Acc) -> + error({invalid_txt_record, invalid_option}); +take_and_convert([], [], Acc) -> + Acc; +take_and_convert([Field | More], Options, Acc) -> + case lists:keytake(Field, 1, Options) of + {value, {"authSource", V}, NOptions} -> + take_and_convert(More, NOptions, Acc#{auth_source => list_to_binary(V)}); + {value, {"replicaSet", V}, NOptions} -> + take_and_convert(More, NOptions, Acc#{rs_set_name => list_to_binary(V)}); + {value, _, _} -> + error({invalid_txt_record, invalid_option}); + false -> + take_and_convert(More, Options, Acc) + end. diff --git a/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl b/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl index 21cf2692e..f22aec41c 100644 --- a/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl +++ b/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl @@ -24,7 +24,7 @@ -logger_header("[SLOW TOPICS]"). --export([ start_link/1, on_publish_done/5, enable/0 +-export([ start_link/1, on_publish_done/3, enable/0 , disable/0, clear_history/0 ]). @@ -42,7 +42,7 @@ -type state() :: #{ config := proplist:proplist() , period := pos_integer() , last_tick_at := pos_integer() - , counter := counters:counter_ref() + , counter := counters:counters_ref() , enable := boolean() }. @@ -70,6 +70,13 @@ -type slow_log() :: #slow_log{}. -type top_k_map() :: #{emqx_types:topic() => top_k()}. +-type publish_done_env() :: #{ ignore_before_create := boolean() + , threshold := pos_integer() + , counter := counters:counters_ref() + }. + +-type publish_done_args() :: #{session_rebirth_time => pos_integer()}. + -ifdef(TEST). -define(TOPK_ACCESS, public). -else. @@ -90,13 +97,16 @@ start_link(Env) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). --spec on_publish_done(message(), - pos_integer(), boolean(), pos_integer(), counters:counters_ref()) -> ok. -on_publish_done(#message{timestamp = Timestamp}, Created, IgnoreBeforeCreate, _, _) +-spec on_publish_done(message(), publish_done_args(), publish_done_env()) -> ok. +on_publish_done(#message{timestamp = Timestamp}, + #{session_rebirth_time := Created}, + #{ignore_before_create := IgnoreBeforeCreate}) when IgnoreBeforeCreate, Timestamp < Created -> ok; -on_publish_done(#message{timestamp = Timestamp} = Msg, _, _, Threshold, Counter) -> +on_publish_done(#message{timestamp = Timestamp} = Msg, + _, + #{threshold := Threshold, counter := Counter}) -> case ?NOW - Timestamp of Elapsed when Elapsed > Threshold -> case get_log_quota(Counter) of @@ -202,7 +212,7 @@ init_topk_tab(_) -> , {read_concurrency, true} ]). --spec get_log_quota(counter:counter_ref()) -> boolean(). +-spec get_log_quota(counters:counters_ref()) -> boolean(). get_log_quota(Counter) -> case counters:get(Counter, ?QUOTA_IDX) of Quota when Quota > 0 -> @@ -212,7 +222,7 @@ get_log_quota(Counter) -> false end. --spec set_log_quota(proplists:proplist(), counter:counter_ref()) -> ok. +-spec set_log_quota(proplists:proplist(), counters:counters_ref()) -> ok. set_log_quota(Cfg, Counter) -> MaxLogNum = get_value(max_log_num, Cfg), counters:put(Counter, ?QUOTA_IDX, MaxLogNum). @@ -328,12 +338,15 @@ publish(TickTime, Cfg, Notices) -> load(IgnoreBeforeCreate, Threshold, Counter) -> _ = emqx:hook('message.publish_done', - fun ?MODULE:on_publish_done/5, - [IgnoreBeforeCreate, Threshold, Counter]), + fun ?MODULE:on_publish_done/3, + [#{ignore_before_create => IgnoreBeforeCreate, + threshold => Threshold, + counter => Counter} + ]), ok. unload() -> - emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/5). + emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3). -spec get_topic(proplists:proplist()) -> binary(). get_topic(Cfg) -> diff --git a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl index a5e5b2906..e6c87d69f 100644 --- a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl +++ b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl @@ -16,6 +16,7 @@ -module(emqx_trace_api). -include_lib("emqx/include/logger.hrl"). +-include_lib("kernel/include/file.hrl"). %% API -export([ list_trace/2 @@ -74,10 +75,10 @@ download_zip_log(#{name := Name}, _Param) -> TraceFiles = collect_trace_file(TraceLog), ZipDir = emqx_trace:zip_dir(), Zips = group_trace_file(ZipDir, TraceLog, TraceFiles), - ZipFileName = ZipDir ++ TraceLog, - {ok, ZipFile} = zip:zip(ZipFileName, Zips), + ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip", + {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]), emqx_trace:delete_files_after_send(ZipFileName, Zips), - {ok, #{}, {sendfile, 0, filelib:file_size(ZipFile), ZipFile}}; + {ok, ZipFile}; {error, Reason} -> {error, Reason} end. @@ -88,7 +89,7 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) -> {ok, Node, Bin} -> ZipName = ZipDir ++ Node ++ "-" ++ TraceLog, ok = file:write_file(ZipName, Bin), - [ZipName | Acc]; + [Node ++ "-" ++ TraceLog | Acc]; {error, Node, Reason} -> ?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]), Acc @@ -101,20 +102,19 @@ collect_trace_file(TraceLog) -> BadNodes =/= [] andalso ?LOG(error, "download log rpc failed on ~p", [BadNodes]), Files. -%% _page as position and _limit as bytes for front-end reusing components stream_log_file(#{name := Name}, Params) -> Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())), - Position0 = proplists:get_value(<<"_page">>, Params, <<"0">>), - Bytes0 = proplists:get_value(<<"_limit">>, Params, <<"500">>), + Position0 = proplists:get_value(<<"position">>, Params, <<"0">>), + Bytes0 = proplists:get_value(<<"bytes">>, Params, <<"1000">>), Node = binary_to_existing_atom(Node0), Position = binary_to_integer(Position0), Bytes = binary_to_integer(Bytes0), case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of {ok, Bin} -> - Meta = #{<<"page">> => Position + byte_size(Bin), <<"limit">> => Bytes}, + Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes}, {ok, #{meta => Meta, items => Bin}}; - eof -> - Meta = #{<<"page">> => Position, <<"limit">> => Bytes}, + {eof, Size} -> + Meta = #{<<"position">> => Size, <<"bytes">> => Bytes}, {ok, #{meta => Meta, items => <<"">>}}; {error, Reason} -> logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]), @@ -134,6 +134,7 @@ read_trace_file(Name, Position, Limit) -> [] -> {error, not_found} end. +-dialyzer({nowarn_function, read_file/3}). read_file(Path, Offset, Bytes) -> {ok, IoDevice} = file:open(Path, [read, raw, binary]), try @@ -141,7 +142,13 @@ read_file(Path, Offset, Bytes) -> 0 -> ok; _ -> file:position(IoDevice, {bof, Offset}) end, - file:read(IoDevice, Bytes) + case file:read(IoDevice, Bytes) of + {ok, Bin} -> {ok, Bin}; + {error, Reason} -> {error, Reason}; + eof -> + #file_info{size = Size} = file:read_file_info(IoDevice), + {eof, Size} + end after file:close(IoDevice) end. diff --git a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl index 1bd28d888..169fd50bc 100644 --- a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl @@ -336,9 +336,8 @@ t_download_log(_Config) -> {ok, _} = emqtt:connect(Client), [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], ct:sleep(100), - {ok, #{}, {sendfile, 0, ZipFileSize, _ZipFile}} = - emqx_trace_api:download_zip_log(#{name => Name}, []), - ?assert(ZipFileSize > 0), + {ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []), + ?assert(filelib:file_size(ZipFile) > 0), ok = emqtt:disconnect(Client), unload(), ok. diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 4ef423b78..a423fb9b7 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -1,6 +1,6 @@ {application, emqx_retainer, [{description, "EMQ X Retainer"}, - {vsn, "4.4.1"}, % strict semver, bump manually! + {vsn, "4.4.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index fcc5652cc..b6de9d9f6 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -156,7 +156,7 @@ start_expire_timer(0, State) -> start_expire_timer(undefined, State) -> State; start_expire_timer(Ms, State) -> - Timer = erlang:send_after(Ms, self(), stats), + Timer = erlang:send_after(Ms, self(), {expire, Ms}), State#state{expiry_timer = Timer}. handle_call(Req, _From, State) -> @@ -168,12 +168,14 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info(stats, State = #state{stats_fun = StatsFun}) -> + StatsTimer = erlang:send_after(timer:seconds(1), self(), stats), StatsFun(retained_count()), - {noreply, State, hibernate}; + {noreply, State#state{stats_timer = StatsTimer}, hibernate}; -handle_info(expire, State) -> +handle_info({expire, Ms} = Expire, State) -> + Timer = erlang:send_after(Ms, self(), Expire), ok = expire_messages(), - {noreply, State, hibernate}; + {noreply, State#state{expiry_timer = Timer}, hibernate}; handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 66af0da21..98e5487e2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -1,6 +1,6 @@ {application, emqx_rule_engine, [{description, "EMQ X Rule Engine"}, - {vsn, "4.3.6"}, % strict semver, bump manually! + {vsn, "4.4.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_registry]}, {applications, [kernel,stdlib,rulesql,getopt]}, diff --git a/apps/emqx_stomp/src/emqx_stomp.appup.src b/apps/emqx_stomp/src/emqx_stomp.appup.src index dce441b3c..a29390bc1 100644 --- a/apps/emqx_stomp/src/emqx_stomp.appup.src +++ b/apps/emqx_stomp/src/emqx_stomp.appup.src @@ -5,7 +5,8 @@ {load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{restart_application,emqx_stomp}]}, + [{restart_application,emqx_stomp}, + {apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]}, {<<".*">>,[]}], [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]}, {"4.3.1",[ diff --git a/apps/emqx_stomp/src/emqx_stomp.erl b/apps/emqx_stomp/src/emqx_stomp.erl index 9eafe3cf7..b8a66009c 100644 --- a/apps/emqx_stomp/src/emqx_stomp.erl +++ b/apps/emqx_stomp/src/emqx_stomp.erl @@ -33,6 +33,8 @@ , stop_listener/3 ]). +-export([force_clear_after_app_stoped/0]). + -export([init/1]). -define(APP, ?MODULE). @@ -52,6 +54,18 @@ start(_StartType, _StartArgs) -> stop(_State) -> stop_listeners(). +force_clear_after_app_stoped() -> + lists:foreach(fun({Name = {ProtoName, _}, _}) -> + case is_stomp_listener(ProtoName) of + true -> esockd:close(Name); + _ -> ok + end + end, esockd:listeners()). + +is_stomp_listener('stomp:tcp') -> true; +is_stomp_listener('stomp:ssl') -> true; +is_stomp_listener(_) -> false. + %%-------------------------------------------------------------------- %% Supervisor callbacks %%-------------------------------------------------------------------- diff --git a/etc/emqx.conf b/etc/emqx.conf index 7b81e40cc..78d383930 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -101,6 +101,11 @@ cluster.autoclean = 5m ## Value: String ## cluster.dns.app = emqx +## Type of dns record. +## +## Value: Value: a | srv +## cluster.dns.type = a + ##-------------------------------------------------------------------- ## Cluster using etcd diff --git a/include/emqx_release.hrl b/include/emqx_release.hrl index 24f6feb57..4b7da70d2 100644 --- a/include/emqx_release.hrl +++ b/include/emqx_release.hrl @@ -29,7 +29,7 @@ -ifndef(EMQX_ENTERPRISE). --define(EMQX_RELEASE, {opensource, "4.4.0"}). +-define(EMQX_RELEASE, {opensource, "4.4.0-alpha.1"}). -else. diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src index 724a76237..e78f1c3f6 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src @@ -1,6 +1,6 @@ {application, emqx_dashboard, [{description, "EMQ X Web Dashboard"}, - {vsn, "4.3.6"}, % strict semver, bump manually! + {vsn, "4.3.7"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_dashboard_sup]}, {applications, [kernel,stdlib,mnesia,minirest]}, diff --git a/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl b/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl index a3abbedf7..1daab1520 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl @@ -87,9 +87,12 @@ update_trace(Path, Params) -> download_zip_log(Path, Params) -> case emqx_trace_api:download_zip_log(Path, Params) of - {ok, _Header, _File}= Return -> Return; - {error, _Reason} = Err -> return(Err) + {ok, File} -> minirest:return_file(File); + {error, Reason} -> return({error, 'NOT_FOUND', Reason}) end. stream_log_file(Path, Params) -> - return(emqx_trace_api:stream_log_file(Path, Params)). + case emqx_trace_api:stream_log_file(Path, Params) of + {ok, File} -> return({ok, File}); + {error, Reason} -> return({error, 'NOT_FOUND', Reason}) + end. diff --git a/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl index 609a2d93c..fc786dbd0 100644 --- a/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl @@ -124,14 +124,14 @@ t_stream_log(_Config) -> {ok, FileBin} = file:read_file(File), ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]), Header = auth_header_(), - {ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?_limit=10"), Header), + {ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?bytes=10"), Header), #{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta, <<"items">> := Bin}} = json(Binary), ?assertEqual(10, byte_size(Bin)), - ?assertEqual(#{<<"page">> => 10, <<"limit">> => 10}, Meta), - Path = api_path("trace/test_stream_log/log?_page=20&_limit=10"), + ?assertEqual(#{<<"position">> => 10, <<"bytes">> => 10}, Meta), + Path = api_path("trace/test_stream_log/log?position=20&bytes=10"), {ok, Binary1} = request_api(get, Path, Header), #{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta1, <<"items">> := Bin1}} = json(Binary1), - ?assertEqual(#{<<"page">> => 30, <<"limit">> => 10}, Meta1), + ?assertEqual(#{<<"position">> => 30, <<"bytes">> => 10}, Meta1), ?assertEqual(10, byte_size(Bin1)), unload(), ok. diff --git a/pkg-vsn.sh b/pkg-vsn.sh index 904690ad1..f3bdb1884 100755 --- a/pkg-vsn.sh +++ b/pkg-vsn.sh @@ -15,9 +15,21 @@ fi ## emqx_release.hrl is the single source of truth for release version RELEASE="$(grep -E "define.+EMQX_RELEASE.+${EDITION}" include/emqx_release.hrl | cut -d '"' -f2)" -## git commit hash is added as suffix in case the git tag and release version is not an exact match -if [ -d .git ] && ! git describe --tags --match "[e|v]${RELEASE}" --exact >/dev/null 2>&1; then +git_exact_vsn() { + local tag + tag="$(git describe --tags --match "[e|v]*" --exact 2>/dev/null)" + echo "$tag" | sed 's/^[v|e]//g' +} + +GIT_EXACT_VSN="$(git_exact_vsn)" +if [ "$GIT_EXACT_VSN" != '' ]; then + if [ "$GIT_EXACT_VSN" != "$RELEASE" ]; then + echo "ERROR: Tagged $GIT_EXACT_VSN, but $RELEASE in include/emqx_release.hrl" 1>&2 + exit 1 + fi + SUFFIX='' +else SUFFIX="-$(git rev-parse HEAD | cut -b1-8)" fi -echo "${RELEASE}${SUFFIX:-}" +echo "${RELEASE}${SUFFIX}" diff --git a/priv/emqx.schema b/priv/emqx.schema index 602de56b9..15f0324cd 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -96,6 +96,11 @@ {datatype, string} ]}. +{mapping, "cluster.dns.type", "ekka.cluster_discovery", [ + {datatype, {enum, [a, srv]}}, + {default, a} +]}. + %%-------------------------------------------------------------------- %% Cluster using etcd @@ -171,7 +176,8 @@ {loop, cuttlefish:conf_get("cluster.mcast.loop", Conf, true)}]; (dns) -> [{name, cuttlefish:conf_get("cluster.dns.name", Conf)}, - {app, cuttlefish:conf_get("cluster.dns.app", Conf)}]; + {app, cuttlefish:conf_get("cluster.dns.app", Conf)}, + {type, cuttlefish:conf_get("cluster.dns.type", Conf)}]; (etcd) -> SslOpts = fun(Conf) -> Options = cuttlefish_variable:filter_by_prefix("cluster.etcd.ssl", Conf), @@ -362,11 +368,35 @@ end}. ]}. %% RPC server port. +{mapping, "rpc.driver", "gen_rpc.driver", +[ {default, tcp} +, {datatype, {enum, [tcp, ssl]}} +]}. + {mapping, "rpc.tcp_server_port", "gen_rpc.tcp_server_port", [ {default, 5369}, {datatype, integer} ]}. +%% RPC SSL server port. +{mapping, "rpc.enable_ssl", "gen_rpc.ssl_server_port", [ + {default, 5369}, + {datatype, integer} +]}. + +%% RPC SSL certificates +{mapping, "rpc.certfile", "gen_rpc.certfile", [ + {datatype, string} +]}. + +{mapping, "rpc.keyfile", "gen_rpc.keyfile", [ + {datatype, string} +]}. + +{mapping, "rpc.cacertfile", "gen_rpc.cacertfile", [ + {datatype, string} +]}. + %% Number of tcp connections when connecting to RPC server {mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [ {default, 0}, diff --git a/rebar.config b/rebar.config index cc147dec3..cd2cf3a64 100644 --- a/rebar.config +++ b/rebar.config @@ -37,14 +37,14 @@ {deps, [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.10"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.12"}}} , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.2"}}} - , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.6.0"}}} + , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.0"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} diff --git a/src/emqx.appup.src b/src/emqx.appup.src index b826775d6..d40db1093 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,6 +1,8 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.10",[{load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, + [{"4.3.10", + [{load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -155,7 +157,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.10",[{load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, + [{"4.3.10", + [{load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 8d0ff11a8..be6f45e25 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -42,10 +42,13 @@ start(_Type, _Args) -> ekka:start(), {ok, Sup} = emqx_sup:start_link(), ok = start_autocluster(), + %% We need to make sure that emqx's listeners start before plugins + %% and modules. Since if the emqx-conf module/plugin is enabled, it will + %% try to start or update the listeners with the latest configuration + emqx_boot:is_enabled(listeners) andalso (ok = emqx_listeners:start()), ok = emqx_plugins:init(), _ = emqx_plugins:load(), _ = start_ce_modules(), - emqx_boot:is_enabled(listeners) andalso (ok = emqx_listeners:start()), register(emqx, self()), ok = emqx_alarm_handler:load(), print_vsn(), diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d4b671e71..6122982ae 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -320,7 +320,8 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, puback(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - emqx:run_hook('message.publish_done', [Msg, CreatedAt]), + emqx:run_hook('message.publish_done', + [Msg, #{session_rebirth_time => CreatedAt}]), Inflight1 = emqx_inflight:delete(PacketId, Inflight), return_with(Msg, dequeue(Session#session{inflight = Inflight1})); {value, {_Pubrel, _Ts}} -> @@ -346,7 +347,8 @@ pubrec(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt} case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> %% execute hook here, because message record will be replaced by pubrel - emqx:run_hook('message.publish_done', [Msg, CreatedAt]), + emqx:run_hook('message.publish_done', + [Msg, #{session_rebirth_time => CreatedAt}]), Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight), {ok, Msg, Session#session{inflight = Inflight1}}; {value, {pubrel, _Ts}} -> @@ -443,7 +445,8 @@ deliver([Msg | More], Acc, Session) -> end. deliver_msg(Msg = #message{qos = ?QOS_0}, Session) -> - emqx:run_hook('message.publish_done', [Msg, Session#session.created_at]), + emqx:run_hook('message.publish_done', + [Msg, #{session_rebirth_time => Session#session.created_at}]), {ok, [{undefined, maybe_ack(Msg)}], Session}; deliver_msg(Msg = #message{qos = QoS}, Session = diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index acafeb36f..a46c046ca 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -183,7 +183,8 @@ t_open_session_race_condition(_) -> exit(Winner, kill), receive {'DOWN', _, process, Winner, _} -> ok end, - ignored = gen_server:call(emqx_cm, ignore, infinity), %% sync + ignored = gen_server:call(?CM, ignore, infinity), %% sync + ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). t_kick_session_discard_normal(_) -> @@ -260,6 +261,7 @@ test_kick_session(Action, Reason) -> ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)) end, + ignored = gen_server:call(?CM, ignore, infinity), %% sync ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). @@ -271,10 +273,11 @@ test_kick_session(Action, Reason) -> %% The number of tasks should be large enough to ensure all workers have %% the chance to work on at least one of the tasks. flush_emqx_pool() -> + Ref = make_ref(), Self = self(), L = lists:seq(1, 1000), - lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L), - lists:foreach(fun(I) -> receive {done, I} -> ok end end, L). + lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I, Ref} end, []) end, L), + lists:foreach(fun(I) -> receive {done, I, Ref} -> ok end end, L). t_discard_session_race(_) -> ClientId = rand_client_id(),