diff --git a/Makefile b/Makefile index 2449d56fb..823ad37e9 100644 --- a/Makefile +++ b/Makefile @@ -2,25 +2,24 @@ PROJECT = emqx PROJECT_DESCRIPTION = EMQ X Broker -PROJECT_VERSION = 3.0 DEPS = jsx gproc gen_rpc ekka esockd cowboy clique emqx_passwd -dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0 -dep_gproc = git https://github.com/uwiger/gproc 0.8.0 -dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.3.0 -dep_esockd = git https://github.com/emqx/esockd v5.4.2 -dep_ekka = git https://github.com/emqx/ekka v0.5.1 -dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0 -dep_clique = git https://github.com/emqx/clique develop -dep_emqx_passwd = git https://github.com/emqx/emqx-passwd win30 +dep_jsx = hex-emqx 2.9.0 +dep_gproc = hex-emqx 0.8.0 +dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.0 +dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.2 +dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.1 +dep_cowboy = hex-emqx 2.4.0 +dep_clique = git-emqx https://github.com/emqx/clique develop NO_AUTOPATCH = cuttlefish ERLC_OPTS += +debug_info -DAPPLICATION=emqx BUILD_DEPS = cuttlefish -dep_cuttlefish = git https://github.com/emqx/cuttlefish v2.1.1 + +dep_cuttlefish = git-emqx https://github.com/emqx/cuttlefish v2.1.1 #TEST_DEPS = emqx_ct_helplers #dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers @@ -37,7 +36,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ - emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge emqx_hooks + emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ + emqx_hooks emqx_batch CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) @@ -48,9 +48,32 @@ PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets pu DIALYZER_DIRS := ebin/ DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wunmatched_returns +GIT_VSN := $(shell git --version | grep -oE "[0-9]{1,2}\.[0-9]{1,2}") +GIT_VSN_17_COMP := $(shell echo -e "$(GIT_VSN)\n1.7" | sort -V | tail -1) + +ifeq ($(GIT_VSN_17_COMP),1.7) +define dep_fetch_git-emqx + git clone -q -n -- $(call dep_repo,$(1)) $(DEPS_DIR)/$(call dep_name,$(1)); \ + cd $(DEPS_DIR)/$(call dep_name,$(1)) && git checkout -q $(call dep_commit,$(1)) +endef +else +define dep_fetch_git-emqx + git clone -q -c advice.detachedHead=false --depth 1 -b $(call dep_commit,$(1)) -- $(call dep_repo,$(1)) $(DEPS_DIR)/$(call dep_name,$(1)) +endef +endif + +core_http_get-emqx = curl -Lf$(if $(filter-out 0,$(V)),,s)o $(call core_native_path,$1) $2 + +define dep_fetch_hex-emqx + mkdir -p $(ERLANG_MK_TMP)/hex $(DEPS_DIR)/$1; \ + $(call core_http_get-emqx,$(ERLANG_MK_TMP)/hex/$1.tar,\ + https://repo.hex.pm/tarballs/$1-$(strip $(word 2,$(dep_$1))).tar); \ + tar -xOf $(ERLANG_MK_TMP)/hex/$1.tar contents.tar.gz | tar -C $(DEPS_DIR)/$1 -xzf -; +endef + include erlang.mk -clean:: gen-clean rebar-clean +clean:: gen-clean .PHONY: gen-clean gen-clean: @@ -109,7 +132,7 @@ rebar-ct: app.config rebar-clean: @rebar3 clean -distclean:: rebar-clean +distclean:: @rm -rf _build cover deps logs log data @rm -f rebar.lock compile_commands.json cuttlefish @@ -120,7 +143,7 @@ comma = , quote = \" curly_l = "{" curly_r = "}" -dep-versions = [$(foreach dep,$(DEPS) $(BUILD_DEPS),$(curly_l)$(dep),$(quote)$(word 3,$(dep_$(dep)))$(quote)$(curly_r)$(comma))[]] +dep-versions = [$(foreach dep,$(DEPS) $(BUILD_DEPS),$(curly_l)$(dep),$(quote)$(word $(words $(dep_$(dep))),$(dep_$(dep)))$(quote)$(curly_r)$(comma))[]] .PHONY: dep-vsn-check dep-vsn-check: diff --git a/README.md b/README.md index f8872e8a0..8d03df8dd 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,15 @@ You can reach the EMQ community and developers via the following channels: Please submit any bugs, issues, and feature requests to [emqx/emqx](https://github.com/emqx/emqx/issues). +## MQTT Specifications + +You can read the mqtt protocol via the following links: + +[MQTT Version 3.1.1](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html) + +[MQTT Version 5.0](https://docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html) + +[MQTT SN](http://mqtt.org/new/wp-content/uploads/2009/06/MQTT-SN_spec_v1.2.pdf) ## License @@ -74,4 +83,3 @@ Licensed under the Apache License, Version 2.0 (the "License");you may not use t Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - diff --git a/docs/.placeholder b/docs/.placeholder deleted file mode 100644 index e69de29bb..000000000 diff --git a/docs/MQTT-SN_spec_v1.2.pdf b/docs/MQTT-SN_spec_v1.2.pdf deleted file mode 100644 index ee6020d23..000000000 Binary files a/docs/MQTT-SN_spec_v1.2.pdf and /dev/null differ diff --git a/docs/README b/docs/README deleted file mode 100644 index 94f96fb12..000000000 --- a/docs/README +++ /dev/null @@ -1,11 +0,0 @@ - -http://emqttd.io/docs/v2/ - -or - -http://docs.emqtt.com/ - -or - -http://emqttd-docs.rtfd.org - diff --git a/docs/mqtt-v3.1.1.pdf b/docs/mqtt-v3.1.1.pdf deleted file mode 100644 index e4095f1b5..000000000 Binary files a/docs/mqtt-v3.1.1.pdf and /dev/null differ diff --git a/docs/mqtt-v5.0.pdf b/docs/mqtt-v5.0.pdf deleted file mode 100644 index c3d0c9725..000000000 Binary files a/docs/mqtt-v5.0.pdf and /dev/null differ diff --git a/erlang.mk b/erlang.mk index f38d22653..c5d4b4f7f 100644 --- a/erlang.mk +++ b/erlang.mk @@ -1186,7 +1186,7 @@ else fi $(appsrc_verbose) cat src/$(PROJECT).app.src \ | sed "s/{[[:space:]]*modules[[:space:]]*,[[:space:]]*\[\]}/{modules, \[$(call comma_list,$(MODULES))\]}/" \ - | sed "s/{id,[[:space:]]*\"git\"}/{id, \"$(subst /,\/,$(GITDESCRIBE))\"}/" \ + | sed "s/{vsn,[[:space:]]*\"git\"}/{vsn, \"$(subst /,\/,$(GITDESCRIBE))\"}/" \ > ebin/$(PROJECT).app endif diff --git a/etc/emqx.conf b/etc/emqx.conf index 3d18dfb83..c9d772cb6 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1929,7 +1929,7 @@ broker.shared_dispatch_ack_enabled = false ## Enable batch clean for deleted routes. ## ## Value: Flag -broker.route_batch_clean = on +broker.route_batch_clean = off ##-------------------------------------------------------------------- ## System Monitor diff --git a/priv/emqx.schema b/priv/emqx.schema index 3838db31e..1001ab5a8 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -426,12 +426,18 @@ end}. {datatype, file} ]}. -{mapping, "sasl", "sasl.sasl_error_logger", [ +{mapping, "log.sasl", "sasl.sasl_error_logger", [ {default, off}, {datatype, flag}, hidden ]}. +{mapping, "log.error_logger", "kernel.error_logger", [ + {default, silent}, + {datatype, {enum, [silent]}}, + hidden +]}. + {translation, "emqx.primary_log_level", fun(Conf) -> cuttlefish:conf_get("log.level", Conf) end}. diff --git a/rebar.config.script b/rebar.config.script index 0b18592f1..aed271af8 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -1,13 +1,21 @@ +CONFIG0 = case os:getenv("REBAR_GIT_CLONE_OPTIONS") of + "--depth 1" -> + CONFIG; + _ -> + os:putenv("REBAR_GIT_CLONE_OPTIONS", "--depth 1"), + CONFIG + end, + CONFIG1 = case os:getenv("TRAVIS") of - "true" -> - JobId = os:getenv("TRAVIS_JOB_ID"), - [{coveralls_service_job_id, JobId}, - {coveralls_coverdata, "_build/test/cover/*.coverdata"}, - {coveralls_service_name , "travis-ci"} | CONFIG]; - _ -> - CONFIG -end, + "true" -> + JobId = os:getenv("TRAVIS_JOB_ID"), + [{coveralls_service_job_id, JobId}, + {coveralls_coverdata, "_build/test/cover/*.coverdata"}, + {coveralls_service_name , "travis-ci"} | CONFIG]; + _ -> + CONFIG + end, {_, Deps} = lists:keyfind(deps, 1, CONFIG1), {_, OurDeps} = lists:keyfind(github_emqx_deps, 1, CONFIG1), diff --git a/src/emqx.app.src b/src/emqx.app.src index 77305f6e9..ce643634e 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -1,6 +1,6 @@ {application,emqx, [{description,"EMQ X Broker"}, - {vsn,"3.0-rc.3"}, + {vsn,"git"}, {modules,[]}, {registered,[emqx_sup]}, {applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd, diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 175271306..49a698384 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -43,7 +43,8 @@ mnesia(boot) -> {type, set}, {disc_copies, [node()]}, {record_name, banned}, - {attributes, record_info(fields, banned)}]); + {attributes, record_info(fields, banned)}, + {storage_properties, [{ets, [{read_concurrency, true}]}]}]); mnesia(copy) -> ok = ekka_mnesia:copy_table(?TAB). diff --git a/src/emqx_batch.erl b/src/emqx_batch.erl new file mode 100644 index 000000000..ffa9a7224 --- /dev/null +++ b/src/emqx_batch.erl @@ -0,0 +1,74 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_batch). + +-export([init/1, push/2, commit/1]). +-export([size/1, items/1]). + +-type(options() :: #{ + batch_size => non_neg_integer(), + linger_ms => pos_integer(), + commit_fun := function() + }). +-export_type([options/0]). + +-record(batch, { + batch_size :: non_neg_integer(), + batch_q :: list(any()), + linger_ms :: pos_integer(), + linger_timer :: reference() | undefined, + commit_fun :: function() + }). +-type(batch() :: #batch{}). +-export_type([batch/0]). + +-spec(init(options()) -> batch()). +init(Opts) when is_map(Opts) -> + #batch{batch_size = maps:get(batch_size, Opts, 1000), + batch_q = [], + linger_ms = maps:get(linger_ms, Opts, 1000), + commit_fun = maps:get(commit_fun, Opts)}. + +-spec(push(any(), batch()) -> batch()). +push(El, Batch = #batch{batch_q = Q, linger_ms = Ms, linger_timer = undefined}) when length(Q) == 0 -> + Batch#batch{batch_q = [El], linger_timer = erlang:send_after(Ms, self(), batch_linger_expired)}; + +%% no limit. +push(El, Batch = #batch{batch_size = 0, batch_q = Q}) -> + Batch#batch{batch_q = [El|Q]}; + +push(El, Batch = #batch{batch_size = MaxSize, batch_q = Q}) when length(Q) >= MaxSize -> + commit(Batch#batch{batch_q = [El|Q]}); + +push(El, Batch = #batch{batch_q = Q}) -> + Batch#batch{batch_q = [El|Q]}. + +-spec(commit(batch()) -> batch()). +commit(Batch = #batch{batch_q = Q, commit_fun = Commit}) -> + _ = Commit(lists:reverse(Q)), + reset(Batch). + +reset(Batch = #batch{linger_timer = TRef}) -> + _ = emqx_misc:cancel_timer(TRef), + Batch#batch{batch_q = [], linger_timer = undefined}. + +-spec(size(batch()) -> non_neg_integer()). +size(#batch{batch_q = Q}) -> + length(Q). + +-spec(items(batch()) -> list(any())). +items(#batch{batch_q = Q}) -> + lists:reverse(Q). + diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index ff53554d4..ca4a86c87 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -33,6 +33,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. + -record(state, {pool, id, submap, submon}). -record(subscribe, {topic, subpid, subid, subopts = #{}}). -record(unsubscribe, {topic, subpid, subid}). @@ -327,8 +332,6 @@ handle_call(Req, _From, State) -> emqx_logger:error("[Broker] unexpected call: ~p", [Req]), {reply, ignored, State}. - - handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) -> Subscriber = {SubPid, SubId}, case ets:member(?SUBOPTION, {Topic, Subscriber}) of diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 609ce25d6..3d0ab0df7 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -170,7 +170,7 @@ send_fun(Transport, Socket) -> Data = emqx_frame:serialize(Packet, Options), try Transport:async_send(Socket, Data) of ok -> - emqx_metrics:inc('bytes/sent', iolist_size(Data)), + emqx_metrics:trans(inc, 'bytes/sent', iolist_size(Data)), ok; Error -> Error catch @@ -215,6 +215,7 @@ handle_info({timeout, Timer, emit_stats}, State = #state{stats_timer = Timer, proto_state = ProtoState }) -> + emqx_metrics:commit(), emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), NewState = State#state{stats_timer = undefined}, Limits = erlang:get(force_shutdown_policy), @@ -248,7 +249,7 @@ handle_info(activate_sock, State) -> handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> ?LOG(debug, "RECV ~p", [Data]), Size = iolist_size(Data), - emqx_metrics:inc('bytes/received', Size), + emqx_metrics:trans(inc, 'bytes/received', Size), Incoming = #{bytes => Size, packets => 0}, handle_packet(Data, State#state{await_recv = false, incoming = Incoming}); diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 8130a307e..caf862146 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -19,6 +19,7 @@ -export([start_link/0]). -export([new/1, all/0]). -export([val/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2]). +-export([trans/2, trans/3, trans/4, commit/0]). %% Received/sent metrics -export([received/1, sent/1]). @@ -133,10 +134,8 @@ inc(Metric, Val) when is_atom(Metric) -> %% @doc Increase metric value -spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()). -inc(gauge, Metric, Val) -> - update_counter(key(gauge, Metric), {2, Val}); -inc(counter, Metric, Val) -> - update_counter(key(counter, Metric), {2, Val}). +inc(Type, Metric, Val) -> + update_counter(key(Type, Metric), {2, Val}). %% @doc Decrease metric value -spec(dec(gauge, atom()) -> integer()). @@ -154,6 +153,41 @@ set(Metric, Val) when is_atom(Metric) -> set(gauge, Metric, Val) -> ets:insert(?TAB, {key(gauge, Metric), Val}). +trans(inc, Metric) -> + trans(inc, {counter, Metric}, 1). + +trans(Opt, {gauge, Metric}, Val) -> + trans(Opt, gauge, Metric, Val); +trans(inc, {counter, Metric}, Val) -> + trans(inc, counter, Metric, Val); +trans(inc, Metric, Val) when is_atom(Metric) -> + trans(inc, counter, Metric, Val); +trans(dec, gauge, Metric) -> + trans(dec, gauge, Metric, 1). + +trans(inc, Type, Metric, Val) -> + hold(Type, Metric, Val); +trans(dec, gauge, Metric, Val) -> + hold(gauge, Metric, -Val). + +hold(Type, Metric, Val) when Type =:= counter orelse Type =:= gauge -> + put('$metrics', case get('$metrics') of + undefined -> + #{{Type, Metric} => Val}; + Metrics -> + maps:update_with({Type, Metric}, fun(Cnt) -> Cnt + Val end, Val, Metrics) + end). + +commit() -> + case get('$metrics') of + undefined -> ok; + Metrics -> + maps:fold(fun({Type, Metric}, Val, _Acc) -> + update_counter(key(Type, Metric), {2, Val}) + end, 0, Metrics), + erase('$metrics') + end. + %% @doc Metric key key(gauge, Metric) -> {Metric, 0}; diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index bb0d8bb0b..3dc73a5d5 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -28,7 +28,7 @@ name(2, _Ver) -> client_identifier_not_valid; name(3, _Ver) -> server_unavaliable; name(4, _Ver) -> malformed_username_or_password; name(5, _Ver) -> unauthorized_client; -name(I, _Ver) -> list_to_atom("unkown_connack" ++ integer_to_list(I)). +name(_, _Ver) -> unknown_error. name(16#00) -> success; name(16#01) -> granted_qos1; @@ -73,7 +73,7 @@ name(16#9F) -> connection_rate_exceeded; name(16#A0) -> maximum_connect_time; name(16#A1) -> subscription_identifiers_not_supported; name(16#A2) -> wildcard_subscriptions_not_supported; -name(Code) -> list_to_atom("unkown_reason_code" ++ integer_to_list(Code)). +name(_Code) -> unknown_error. text(16#00) -> <<"Success">>; text(16#01) -> <<"Granted QoS 1">>; @@ -118,7 +118,7 @@ text(16#9F) -> <<"Connection rate exceeded">>; text(16#A0) -> <<"Maximum connect time">>; text(16#A1) -> <<"Subscription Identifiers not supported">>; text(16#A2) -> <<"Wildcard Subscriptions not supported">>; -text(Code) -> iolist_to_binary(["Unkown Reason Code:", integer_to_list(Code)]). +text(_Code) -> <<"Unknown error">>. compat(connack, 16#80) -> ?CONNACK_PROTO_VER; compat(connack, 16#81) -> ?CONNACK_PROTO_VER; diff --git a/src/emqx_router.erl b/src/emqx_router.erl index b1f2e783a..2ac656d56 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -54,7 +54,9 @@ mnesia(boot) -> {type, bag}, {ram_copies, [node()]}, {record_name, route}, - {attributes, record_info(fields, route)}]); + {attributes, record_info(fields, route)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]); mnesia(copy) -> ok = ekka_mnesia:copy_table(?ROUTE). @@ -137,7 +139,7 @@ pick(Topic) -> init([Pool, Id]) -> rand:seed(exsplus, erlang:timestamp()), gproc_pool:connect_worker(Pool, {Pool, Id}), - Batch = #batch{enabled = emqx_config:get_env(route_batch_delete, false), + Batch = #batch{enabled = emqx_config:get_env(route_batch_clean, false), pending = sets:new()}, {ok, ensure_batch_timer(#state{pool = Pool, id = Id, batch = Batch})}. @@ -191,7 +193,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, _TRef, batch_delete}, State = #state{batch = Batch}) -> - _ = del_direct_routes(Batch#batch.pending), + _ = del_direct_routes(sets:to_list(Batch#batch.pending)), {noreply, ensure_batch_timer(State#state{batch = ?BATCH(true, sets:new())}), hibernate}; handle_info(Info, State) -> diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index 5431c9900..7bacddd4c 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -53,7 +53,8 @@ mnesia(boot) -> {type, set}, {ram_copies, [node()]}, {record_name, routing_node}, - {attributes, record_info(fields, routing_node)}]); + {attributes, record_info(fields, routing_node)}, + {storage_properties, [{ets, [{read_concurrency, true}]}]}]); mnesia(copy) -> ok = ekka_mnesia:copy_table(?ROUTING_NODE). diff --git a/src/emqx_session.erl b/src/emqx_session.erl index b223dfdda..12718b9fc 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -421,7 +421,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From, {ok, ensure_await_rel_timer(State1)} end; true -> - emqx_metrics:inc('messages/qos2/dropped'), + emqx_metrics:trans(inc, 'messages/qos2/dropped'), ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State), {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} end); @@ -432,7 +432,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In true -> {ok, acked(pubrec, PacketId, State)}; false -> - emqx_metrics:inc('packets/pubrec/missed'), + emqx_metrics:trans(inc, 'packets/pubrec/missed'), ?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State), {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} end); @@ -443,7 +443,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel {_Ts, AwaitingRel1} -> {ok, State#state{awaiting_rel = AwaitingRel1}}; error -> - emqx_metrics:inc('packets/pubrel/missed'), + emqx_metrics:trans(inc, 'packets/pubrel/missed'), ?LOG(warning, "Cannot find PUBREL: ~w", [PacketId], State), {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} end); @@ -502,7 +502,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight} noreply(dequeue(acked(puback, PacketId, State))); false -> ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State), - emqx_metrics:inc('packets/puback/missed'), + emqx_metrics:trans(inc, 'packets/puback/missed'), {noreply, State} end; @@ -513,7 +513,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight noreply(dequeue(acked(pubcomp, PacketId, State))); false -> ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State), - emqx_metrics:inc('packets/pubcomp/missed'), + emqx_metrics:trans(inc, 'packets/pubcomp/missed'), {noreply, State} end; @@ -603,6 +603,7 @@ handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) -> + emqx_metrics:commit(), _ = emqx_sm:set_session_stats(ClientId, stats(State)), NewState = State#state{stats_timer = undefined}, Limits = erlang:get(force_shutdown_policy), @@ -734,7 +735,7 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, {publish, {PacketId, Msg}} -> case emqx_message:is_expired(Msg) of true -> - emqx_metrics:inc('messages/expired'), + emqx_metrics:trans(inc, 'messages/expired'), emqx_inflight:delete(PacketId, Inflight); false -> redeliver({PacketId, Msg}, State), @@ -774,7 +775,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now, State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> case (timer:now_diff(Now, Ts) div 1000) of Age when Age >= Timeout -> - emqx_metrics:inc('messages/qos2/expired'), + emqx_metrics:trans(inc, 'messages/qos2/expired'), ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId], State), expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); Age -> diff --git a/src/emqx_sm_registry.erl b/src/emqx_sm_registry.erl index 659b3a92b..b503d71c8 100644 --- a/src/emqx_sm_registry.erl +++ b/src/emqx_sm_registry.erl @@ -69,9 +69,11 @@ init([]) -> {type, bag}, {ram_copies, [node()]}, {record_name, global_session}, - {attributes, record_info(fields, global_session)}]), + {attributes, record_info(fields, global_session)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]), ok = ekka_mnesia:copy_table(?TAB), - ekka:monitor(membership), + _ = ekka:monitor(membership), {ok, #{}}. handle_call(Req, _From, State) -> diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index f5dfa93fe..569de9092 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -36,16 +36,21 @@ %% @doc Create or replicate trie tables. -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> + %% Optimize + StoreProps = [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}], %% Trie table ok = ekka_mnesia:create_table(?TRIE, [ {ram_copies, [node()]}, {record_name, trie}, - {attributes, record_info(fields, trie)}]), + {attributes, record_info(fields, trie)}, + {storage_properties, StoreProps}]), %% Trie node table ok = ekka_mnesia:create_table(?TRIE_NODE, [ {ram_copies, [node()]}, {record_name, trie_node}, - {attributes, record_info(fields, trie_node)}]); + {attributes, record_info(fields, trie_node)}, + {storage_properties, StoreProps}]); mnesia(copy) -> %% Copy trie table diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 77d2c058f..180edc8f0 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -148,7 +148,7 @@ send_fun(WsPid) -> fun(Packet, Options) -> Data = emqx_frame:serialize(Packet, Options), BinSize = iolist_size(Data), - emqx_metrics:inc('bytes/sent', BinSize), + emqx_metrics:trans(inc, 'bytes/sent', BinSize), put(send_oct, get(send_oct) + BinSize), put(send_cnt, get(send_cnt) + 1), WsPid ! {binary, iolist_to_binary(Data)}, @@ -167,7 +167,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, BinSize = iolist_size(Data), put(recv_oct, get(recv_oct) + BinSize), ?LOG(debug, "RECV ~p", [Data]), - emqx_metrics:inc('bytes/received', BinSize), + emqx_metrics:trans(inc, 'bytes/received', BinSize), case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of {more, NewParserState} -> {ok, State#state{parser_state = NewParserState}}; @@ -223,6 +223,7 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> websocket_info({timeout, Timer, emit_stats}, State = #state{stats_timer = Timer, proto_state = ProtoState}) -> + emqx_metrics:commit(), emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), {ok, State#state{stats_timer = undefined}, hibernate}; diff --git a/test/emqx_batch_SUITE.erl b/test/emqx_batch_SUITE.erl new file mode 100644 index 000000000..c4c69080b --- /dev/null +++ b/test/emqx_batch_SUITE.erl @@ -0,0 +1,50 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_batch_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [batch_full_commit, batch_linger_commit]. + +batch_full_commit(_) -> + B0 = emqx_batch:init(#{batch_size => 3, linger_ms => 2000, commit_fun => fun(_) -> ok end}), + B3 = lists:foldl(fun(E, B) -> emqx_batch:push(E, B) end, B0, [a, b, c]), + ?assertEqual(3, emqx_batch:size(B3)), + ?assertEqual([a, b, c], emqx_batch:items(B3)), + %% Trigger commit fun. + B4 = emqx_batch:push(a, B3), + ?assertEqual(0, emqx_batch:size(B4)), + ?assertEqual([], emqx_batch:items(B4)). + +batch_linger_commit(_) -> + CommitFun = fun(Q) -> ?assertEqual(3, length(Q)) end, + B0 = emqx_batch:init(#{batch_size => 3, linger_ms => 500, commit_fun => CommitFun}), + B3 = lists:foldl(fun(E, B) -> emqx_batch:push(E, B) end, B0, [a, b, c]), + ?assertEqual(3, emqx_batch:size(B3)), + ?assertEqual([a, b, c], emqx_batch:items(B3)), + receive + batch_linger_expired -> + B4 = emqx_batch:commit(B3), + ?assertEqual(0, emqx_batch:size(B4)), + ?assertEqual([], emqx_batch:items(B4)) + after + 1000 -> + error(linger_timer_not_triggered) + end. + diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 3187aafa4..b9bfb4257 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -38,6 +38,7 @@ groups() -> publish, pubsub, t_local_subscribe, t_shared_subscribe, + dispatch_with_no_sub, 'pubsub#', 'pubsub+']}, {session, [sequence], [start_session]}, {metrics, [sequence], [inc_dec_metric]}, @@ -76,6 +77,11 @@ publish(_) -> emqx:publish(Msg), ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end). +dispatch_with_no_sub(_) -> + Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>), + Delivery = #delivery{sender = self(), message = Msg, results = []}, + ?assertEqual(Delivery, emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)). + pubsub(_) -> true = emqx:is_running(node()), Self = self(), @@ -193,4 +199,3 @@ set_alarms(_) -> ?assertEqual(1, length(Alarms)), emqx_alarm_mgr:clear_alarm(<<"1">>), [] = emqx_alarm_mgr:get_alarms(). - diff --git a/test/emqx_metrics_SUITE.erl b/test/emqx_metrics_SUITE.erl index 8e601562c..ead7e98ba 100644 --- a/test/emqx_metrics_SUITE.erl +++ b/test/emqx_metrics_SUITE.erl @@ -19,21 +19,40 @@ -include("emqx_mqtt.hrl"). -all() -> [t_inc_dec_metrics]. +all() -> [t_inc_dec_metrics, t_trans]. t_inc_dec_metrics(_) -> {ok, _} = emqx_metrics:start_link(), {0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')}, emqx_metrics:inc('bytes/received'), emqx_metrics:inc({counter, 'bytes/received'}, 2), - emqx_metrics:inc(counter, 'bytes/received', 2), + emqx_metrics:inc(counter, 'bytes/received', 1), + emqx_metrics:inc('bytes/received', 1), emqx_metrics:inc({gauge, 'messages/retained'}, 2), emqx_metrics:inc(gauge, 'messages/retained', 2), {5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')}, emqx_metrics:dec(gauge, 'messages/retained'), emqx_metrics:dec(gauge, 'messages/retained', 1), 2 = emqx_metrics:val('messages/retained'), + emqx_metrics:set('messages/retained', 3), + 3 = emqx_metrics:val('messages/retained'), emqx_metrics:received(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}), {1, 1} = {emqx_metrics:val('packets/received'), emqx_metrics:val('packets/connect')}, emqx_metrics:sent(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}}), {1, 1} = {emqx_metrics:val('packets/sent'), emqx_metrics:val('packets/connack')}. + +t_trans(_) -> + {ok, _} = emqx_metrics:start_link(), + emqx_metrics:trans(inc, 'bytes/received'), + emqx_metrics:trans(inc, {counter, 'bytes/received'}, 2), + emqx_metrics:trans(inc, counter, 'bytes/received', 2), + emqx_metrics:trans(inc, {gauge, 'messages/retained'}, 2), + emqx_metrics:trans(inc, gauge, 'messages/retained', 2), + {0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')}, + emqx_metrics:commit(), + {5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')}, + emqx_metrics:trans(dec, gauge, 'messages/retained'), + emqx_metrics:trans(dec, gauge, 'messages/retained', 1), + 4 = emqx_metrics:val('messages/retained'), + emqx_metrics:commit(), + 2 = emqx_metrics:val('messages/retained'). diff --git a/test/emqx_router_SUITE.erl b/test/emqx_router_SUITE.erl index a35da9c5d..e317ec7b3 100644 --- a/test/emqx_router_SUITE.erl +++ b/test/emqx_router_SUITE.erl @@ -64,7 +64,7 @@ add_del_route(_) -> ?R:del_route(From, <<"a/b/c">>, node()), ?R:del_route(From, <<"a/+/b">>, node()), - timer:sleep(1), + timer:sleep(120), ?assertEqual([], lists:sort(?R:topics())). match_routes(_) -> diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 2bae869b6..a04a0b82b 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -45,6 +45,7 @@ ignore_loop(_Config) -> application:set_env(emqx, mqtt_ignore_loop_deliver, false). t_session_all(_) -> + emqx_zone:set_env(internal, idle_timeout, 100), ClientId = <<"ClientId">>, {ok, ConnPid} = emqx_mock_client:start_link(ClientId), {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),