Fix conflict

This commit is contained in:
周子博 2018-11-30 19:51:59 +08:00
commit 76ced677ee
29 changed files with 305 additions and 71 deletions

View File

@ -2,25 +2,24 @@
PROJECT = emqx PROJECT = emqx
PROJECT_DESCRIPTION = EMQ X Broker PROJECT_DESCRIPTION = EMQ X Broker
PROJECT_VERSION = 3.0
DEPS = jsx gproc gen_rpc ekka esockd cowboy clique emqx_passwd DEPS = jsx gproc gen_rpc ekka esockd cowboy clique emqx_passwd
dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0 dep_jsx = hex-emqx 2.9.0
dep_gproc = git https://github.com/uwiger/gproc 0.8.0 dep_gproc = hex-emqx 0.8.0
dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.3.0 dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.0
dep_esockd = git https://github.com/emqx/esockd v5.4.2 dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.2
dep_ekka = git https://github.com/emqx/ekka v0.5.1 dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.1
dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0 dep_cowboy = hex-emqx 2.4.0
dep_clique = git https://github.com/emqx/clique develop dep_clique = git-emqx https://github.com/emqx/clique develop
dep_emqx_passwd = git https://github.com/emqx/emqx-passwd win30
NO_AUTOPATCH = cuttlefish NO_AUTOPATCH = cuttlefish
ERLC_OPTS += +debug_info -DAPPLICATION=emqx ERLC_OPTS += +debug_info -DAPPLICATION=emqx
BUILD_DEPS = cuttlefish BUILD_DEPS = cuttlefish
dep_cuttlefish = git https://github.com/emqx/cuttlefish v2.1.1
dep_cuttlefish = git-emqx https://github.com/emqx/cuttlefish v2.1.1
#TEST_DEPS = emqx_ct_helplers #TEST_DEPS = emqx_ct_helplers
#dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers #dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers
@ -37,7 +36,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \ emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \
emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge emqx_hooks emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
emqx_hooks emqx_batch
CT_NODE_NAME = emqxct@127.0.0.1 CT_NODE_NAME = emqxct@127.0.0.1
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
@ -48,9 +48,32 @@ PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets pu
DIALYZER_DIRS := ebin/ DIALYZER_DIRS := ebin/
DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wunmatched_returns DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wunmatched_returns
GIT_VSN := $(shell git --version | grep -oE "[0-9]{1,2}\.[0-9]{1,2}")
GIT_VSN_17_COMP := $(shell echo -e "$(GIT_VSN)\n1.7" | sort -V | tail -1)
ifeq ($(GIT_VSN_17_COMP),1.7)
define dep_fetch_git-emqx
git clone -q -n -- $(call dep_repo,$(1)) $(DEPS_DIR)/$(call dep_name,$(1)); \
cd $(DEPS_DIR)/$(call dep_name,$(1)) && git checkout -q $(call dep_commit,$(1))
endef
else
define dep_fetch_git-emqx
git clone -q -c advice.detachedHead=false --depth 1 -b $(call dep_commit,$(1)) -- $(call dep_repo,$(1)) $(DEPS_DIR)/$(call dep_name,$(1))
endef
endif
core_http_get-emqx = curl -Lf$(if $(filter-out 0,$(V)),,s)o $(call core_native_path,$1) $2
define dep_fetch_hex-emqx
mkdir -p $(ERLANG_MK_TMP)/hex $(DEPS_DIR)/$1; \
$(call core_http_get-emqx,$(ERLANG_MK_TMP)/hex/$1.tar,\
https://repo.hex.pm/tarballs/$1-$(strip $(word 2,$(dep_$1))).tar); \
tar -xOf $(ERLANG_MK_TMP)/hex/$1.tar contents.tar.gz | tar -C $(DEPS_DIR)/$1 -xzf -;
endef
include erlang.mk include erlang.mk
clean:: gen-clean rebar-clean clean:: gen-clean
.PHONY: gen-clean .PHONY: gen-clean
gen-clean: gen-clean:
@ -109,7 +132,7 @@ rebar-ct: app.config
rebar-clean: rebar-clean:
@rebar3 clean @rebar3 clean
distclean:: rebar-clean distclean::
@rm -rf _build cover deps logs log data @rm -rf _build cover deps logs log data
@rm -f rebar.lock compile_commands.json cuttlefish @rm -f rebar.lock compile_commands.json cuttlefish
@ -120,7 +143,7 @@ comma = ,
quote = \" quote = \"
curly_l = "{" curly_l = "{"
curly_r = "}" curly_r = "}"
dep-versions = [$(foreach dep,$(DEPS) $(BUILD_DEPS),$(curly_l)$(dep),$(quote)$(word 3,$(dep_$(dep)))$(quote)$(curly_r)$(comma))[]] dep-versions = [$(foreach dep,$(DEPS) $(BUILD_DEPS),$(curly_l)$(dep),$(quote)$(word $(words $(dep_$(dep))),$(dep_$(dep)))$(quote)$(curly_r)$(comma))[]]
.PHONY: dep-vsn-check .PHONY: dep-vsn-check
dep-vsn-check: dep-vsn-check:

View File

@ -63,6 +63,15 @@ You can reach the EMQ community and developers via the following channels:
Please submit any bugs, issues, and feature requests to [emqx/emqx](https://github.com/emqx/emqx/issues). Please submit any bugs, issues, and feature requests to [emqx/emqx](https://github.com/emqx/emqx/issues).
## MQTT Specifications
You can read the mqtt protocol via the following links:
[MQTT Version 3.1.1](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html)
[MQTT Version 5.0](https://docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html)
[MQTT SN](http://mqtt.org/new/wp-content/uploads/2009/06/MQTT-SN_spec_v1.2.pdf)
## License ## License
@ -74,4 +83,3 @@ Licensed under the Apache License, Version 2.0 (the "License");you may not use t
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License. See the License for the specific language governing permissions and limitations under the License.

View File

Binary file not shown.

View File

@ -1,11 +0,0 @@
http://emqttd.io/docs/v2/
or
http://docs.emqtt.com/
or
http://emqttd-docs.rtfd.org

Binary file not shown.

Binary file not shown.

2
erlang.mk vendored
View File

@ -1186,7 +1186,7 @@ else
fi fi
$(appsrc_verbose) cat src/$(PROJECT).app.src \ $(appsrc_verbose) cat src/$(PROJECT).app.src \
| sed "s/{[[:space:]]*modules[[:space:]]*,[[:space:]]*\[\]}/{modules, \[$(call comma_list,$(MODULES))\]}/" \ | sed "s/{[[:space:]]*modules[[:space:]]*,[[:space:]]*\[\]}/{modules, \[$(call comma_list,$(MODULES))\]}/" \
| sed "s/{id,[[:space:]]*\"git\"}/{id, \"$(subst /,\/,$(GITDESCRIBE))\"}/" \ | sed "s/{vsn,[[:space:]]*\"git\"}/{vsn, \"$(subst /,\/,$(GITDESCRIBE))\"}/" \
> ebin/$(PROJECT).app > ebin/$(PROJECT).app
endif endif

View File

@ -1929,7 +1929,7 @@ broker.shared_dispatch_ack_enabled = false
## Enable batch clean for deleted routes. ## Enable batch clean for deleted routes.
## ##
## Value: Flag ## Value: Flag
broker.route_batch_clean = on broker.route_batch_clean = off
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## System Monitor ## System Monitor

View File

@ -426,12 +426,18 @@ end}.
{datatype, file} {datatype, file}
]}. ]}.
{mapping, "sasl", "sasl.sasl_error_logger", [ {mapping, "log.sasl", "sasl.sasl_error_logger", [
{default, off}, {default, off},
{datatype, flag}, {datatype, flag},
hidden hidden
]}. ]}.
{mapping, "log.error_logger", "kernel.error_logger", [
{default, silent},
{datatype, {enum, [silent]}},
hidden
]}.
{translation, "emqx.primary_log_level", fun(Conf) -> {translation, "emqx.primary_log_level", fun(Conf) ->
cuttlefish:conf_get("log.level", Conf) cuttlefish:conf_get("log.level", Conf)
end}. end}.

View File

@ -1,4 +1,12 @@
CONFIG0 = case os:getenv("REBAR_GIT_CLONE_OPTIONS") of
"--depth 1" ->
CONFIG;
_ ->
os:putenv("REBAR_GIT_CLONE_OPTIONS", "--depth 1"),
CONFIG
end,
CONFIG1 = case os:getenv("TRAVIS") of CONFIG1 = case os:getenv("TRAVIS") of
"true" -> "true" ->
JobId = os:getenv("TRAVIS_JOB_ID"), JobId = os:getenv("TRAVIS_JOB_ID"),
@ -7,7 +15,7 @@ CONFIG1 = case os:getenv("TRAVIS") of
{coveralls_service_name , "travis-ci"} | CONFIG]; {coveralls_service_name , "travis-ci"} | CONFIG];
_ -> _ ->
CONFIG CONFIG
end, end,
{_, Deps} = lists:keyfind(deps, 1, CONFIG1), {_, Deps} = lists:keyfind(deps, 1, CONFIG1),
{_, OurDeps} = lists:keyfind(github_emqx_deps, 1, CONFIG1), {_, OurDeps} = lists:keyfind(github_emqx_deps, 1, CONFIG1),

View File

@ -1,6 +1,6 @@
{application,emqx, {application,emqx,
[{description,"EMQ X Broker"}, [{description,"EMQ X Broker"},
{vsn,"3.0-rc.3"}, {vsn,"git"},
{modules,[]}, {modules,[]},
{registered,[emqx_sup]}, {registered,[emqx_sup]},
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd, {applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd,

View File

@ -43,7 +43,8 @@ mnesia(boot) ->
{type, set}, {type, set},
{disc_copies, [node()]}, {disc_copies, [node()]},
{record_name, banned}, {record_name, banned},
{attributes, record_info(fields, banned)}]); {attributes, record_info(fields, banned)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]);
mnesia(copy) -> mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB). ok = ekka_mnesia:copy_table(?TAB).

74
src/emqx_batch.erl Normal file
View File

@ -0,0 +1,74 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_batch).
-export([init/1, push/2, commit/1]).
-export([size/1, items/1]).
-type(options() :: #{
batch_size => non_neg_integer(),
linger_ms => pos_integer(),
commit_fun := function()
}).
-export_type([options/0]).
-record(batch, {
batch_size :: non_neg_integer(),
batch_q :: list(any()),
linger_ms :: pos_integer(),
linger_timer :: reference() | undefined,
commit_fun :: function()
}).
-type(batch() :: #batch{}).
-export_type([batch/0]).
-spec(init(options()) -> batch()).
init(Opts) when is_map(Opts) ->
#batch{batch_size = maps:get(batch_size, Opts, 1000),
batch_q = [],
linger_ms = maps:get(linger_ms, Opts, 1000),
commit_fun = maps:get(commit_fun, Opts)}.
-spec(push(any(), batch()) -> batch()).
push(El, Batch = #batch{batch_q = Q, linger_ms = Ms, linger_timer = undefined}) when length(Q) == 0 ->
Batch#batch{batch_q = [El], linger_timer = erlang:send_after(Ms, self(), batch_linger_expired)};
%% no limit.
push(El, Batch = #batch{batch_size = 0, batch_q = Q}) ->
Batch#batch{batch_q = [El|Q]};
push(El, Batch = #batch{batch_size = MaxSize, batch_q = Q}) when length(Q) >= MaxSize ->
commit(Batch#batch{batch_q = [El|Q]});
push(El, Batch = #batch{batch_q = Q}) ->
Batch#batch{batch_q = [El|Q]}.
-spec(commit(batch()) -> batch()).
commit(Batch = #batch{batch_q = Q, commit_fun = Commit}) ->
_ = Commit(lists:reverse(Q)),
reset(Batch).
reset(Batch = #batch{linger_timer = TRef}) ->
_ = emqx_misc:cancel_timer(TRef),
Batch#batch{batch_q = [], linger_timer = undefined}.
-spec(size(batch()) -> non_neg_integer()).
size(#batch{batch_q = Q}) ->
length(Q).
-spec(items(batch()) -> list(any())).
items(#batch{batch_q = Q}) ->
lists:reverse(Q).

View File

@ -33,6 +33,11 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]). code_change/3]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-record(state, {pool, id, submap, submon}). -record(state, {pool, id, submap, submon}).
-record(subscribe, {topic, subpid, subid, subopts = #{}}). -record(subscribe, {topic, subpid, subid, subopts = #{}}).
-record(unsubscribe, {topic, subpid, subid}). -record(unsubscribe, {topic, subpid, subid}).
@ -327,8 +332,6 @@ handle_call(Req, _From, State) ->
emqx_logger:error("[Broker] unexpected call: ~p", [Req]), emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) -> handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) ->
Subscriber = {SubPid, SubId}, Subscriber = {SubPid, SubId},
case ets:member(?SUBOPTION, {Topic, Subscriber}) of case ets:member(?SUBOPTION, {Topic, Subscriber}) of

View File

@ -170,7 +170,7 @@ send_fun(Transport, Socket) ->
Data = emqx_frame:serialize(Packet, Options), Data = emqx_frame:serialize(Packet, Options),
try Transport:async_send(Socket, Data) of try Transport:async_send(Socket, Data) of
ok -> ok ->
emqx_metrics:inc('bytes/sent', iolist_size(Data)), emqx_metrics:trans(inc, 'bytes/sent', iolist_size(Data)),
ok; ok;
Error -> Error Error -> Error
catch catch
@ -215,6 +215,7 @@ handle_info({timeout, Timer, emit_stats},
State = #state{stats_timer = Timer, State = #state{stats_timer = Timer,
proto_state = ProtoState proto_state = ProtoState
}) -> }) ->
emqx_metrics:commit(),
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
NewState = State#state{stats_timer = undefined}, NewState = State#state{stats_timer = undefined},
Limits = erlang:get(force_shutdown_policy), Limits = erlang:get(force_shutdown_policy),
@ -248,7 +249,7 @@ handle_info(activate_sock, State) ->
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
?LOG(debug, "RECV ~p", [Data]), ?LOG(debug, "RECV ~p", [Data]),
Size = iolist_size(Data), Size = iolist_size(Data),
emqx_metrics:inc('bytes/received', Size), emqx_metrics:trans(inc, 'bytes/received', Size),
Incoming = #{bytes => Size, packets => 0}, Incoming = #{bytes => Size, packets => 0},
handle_packet(Data, State#state{await_recv = false, incoming = Incoming}); handle_packet(Data, State#state{await_recv = false, incoming = Incoming});

View File

@ -19,6 +19,7 @@
-export([start_link/0]). -export([start_link/0]).
-export([new/1, all/0]). -export([new/1, all/0]).
-export([val/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2]). -export([val/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2]).
-export([trans/2, trans/3, trans/4, commit/0]).
%% Received/sent metrics %% Received/sent metrics
-export([received/1, sent/1]). -export([received/1, sent/1]).
@ -133,10 +134,8 @@ inc(Metric, Val) when is_atom(Metric) ->
%% @doc Increase metric value %% @doc Increase metric value
-spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()). -spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()).
inc(gauge, Metric, Val) -> inc(Type, Metric, Val) ->
update_counter(key(gauge, Metric), {2, Val}); update_counter(key(Type, Metric), {2, Val}).
inc(counter, Metric, Val) ->
update_counter(key(counter, Metric), {2, Val}).
%% @doc Decrease metric value %% @doc Decrease metric value
-spec(dec(gauge, atom()) -> integer()). -spec(dec(gauge, atom()) -> integer()).
@ -154,6 +153,41 @@ set(Metric, Val) when is_atom(Metric) ->
set(gauge, Metric, Val) -> set(gauge, Metric, Val) ->
ets:insert(?TAB, {key(gauge, Metric), Val}). ets:insert(?TAB, {key(gauge, Metric), Val}).
trans(inc, Metric) ->
trans(inc, {counter, Metric}, 1).
trans(Opt, {gauge, Metric}, Val) ->
trans(Opt, gauge, Metric, Val);
trans(inc, {counter, Metric}, Val) ->
trans(inc, counter, Metric, Val);
trans(inc, Metric, Val) when is_atom(Metric) ->
trans(inc, counter, Metric, Val);
trans(dec, gauge, Metric) ->
trans(dec, gauge, Metric, 1).
trans(inc, Type, Metric, Val) ->
hold(Type, Metric, Val);
trans(dec, gauge, Metric, Val) ->
hold(gauge, Metric, -Val).
hold(Type, Metric, Val) when Type =:= counter orelse Type =:= gauge ->
put('$metrics', case get('$metrics') of
undefined ->
#{{Type, Metric} => Val};
Metrics ->
maps:update_with({Type, Metric}, fun(Cnt) -> Cnt + Val end, Val, Metrics)
end).
commit() ->
case get('$metrics') of
undefined -> ok;
Metrics ->
maps:fold(fun({Type, Metric}, Val, _Acc) ->
update_counter(key(Type, Metric), {2, Val})
end, 0, Metrics),
erase('$metrics')
end.
%% @doc Metric key %% @doc Metric key
key(gauge, Metric) -> key(gauge, Metric) ->
{Metric, 0}; {Metric, 0};

View File

@ -28,7 +28,7 @@ name(2, _Ver) -> client_identifier_not_valid;
name(3, _Ver) -> server_unavaliable; name(3, _Ver) -> server_unavaliable;
name(4, _Ver) -> malformed_username_or_password; name(4, _Ver) -> malformed_username_or_password;
name(5, _Ver) -> unauthorized_client; name(5, _Ver) -> unauthorized_client;
name(I, _Ver) -> list_to_atom("unkown_connack" ++ integer_to_list(I)). name(_, _Ver) -> unknown_error.
name(16#00) -> success; name(16#00) -> success;
name(16#01) -> granted_qos1; name(16#01) -> granted_qos1;
@ -73,7 +73,7 @@ name(16#9F) -> connection_rate_exceeded;
name(16#A0) -> maximum_connect_time; name(16#A0) -> maximum_connect_time;
name(16#A1) -> subscription_identifiers_not_supported; name(16#A1) -> subscription_identifiers_not_supported;
name(16#A2) -> wildcard_subscriptions_not_supported; name(16#A2) -> wildcard_subscriptions_not_supported;
name(Code) -> list_to_atom("unkown_reason_code" ++ integer_to_list(Code)). name(_Code) -> unknown_error.
text(16#00) -> <<"Success">>; text(16#00) -> <<"Success">>;
text(16#01) -> <<"Granted QoS 1">>; text(16#01) -> <<"Granted QoS 1">>;
@ -118,7 +118,7 @@ text(16#9F) -> <<"Connection rate exceeded">>;
text(16#A0) -> <<"Maximum connect time">>; text(16#A0) -> <<"Maximum connect time">>;
text(16#A1) -> <<"Subscription Identifiers not supported">>; text(16#A1) -> <<"Subscription Identifiers not supported">>;
text(16#A2) -> <<"Wildcard Subscriptions not supported">>; text(16#A2) -> <<"Wildcard Subscriptions not supported">>;
text(Code) -> iolist_to_binary(["Unkown Reason Code:", integer_to_list(Code)]). text(_Code) -> <<"Unknown error">>.
compat(connack, 16#80) -> ?CONNACK_PROTO_VER; compat(connack, 16#80) -> ?CONNACK_PROTO_VER;
compat(connack, 16#81) -> ?CONNACK_PROTO_VER; compat(connack, 16#81) -> ?CONNACK_PROTO_VER;

View File

@ -54,7 +54,9 @@ mnesia(boot) ->
{type, bag}, {type, bag},
{ram_copies, [node()]}, {ram_copies, [node()]},
{record_name, route}, {record_name, route},
{attributes, record_info(fields, route)}]); {attributes, record_info(fields, route)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]);
mnesia(copy) -> mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ROUTE). ok = ekka_mnesia:copy_table(?ROUTE).
@ -137,7 +139,7 @@ pick(Topic) ->
init([Pool, Id]) -> init([Pool, Id]) ->
rand:seed(exsplus, erlang:timestamp()), rand:seed(exsplus, erlang:timestamp()),
gproc_pool:connect_worker(Pool, {Pool, Id}), gproc_pool:connect_worker(Pool, {Pool, Id}),
Batch = #batch{enabled = emqx_config:get_env(route_batch_delete, false), Batch = #batch{enabled = emqx_config:get_env(route_batch_clean, false),
pending = sets:new()}, pending = sets:new()},
{ok, ensure_batch_timer(#state{pool = Pool, id = Id, batch = Batch})}. {ok, ensure_batch_timer(#state{pool = Pool, id = Id, batch = Batch})}.
@ -191,7 +193,7 @@ handle_cast(Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({timeout, _TRef, batch_delete}, State = #state{batch = Batch}) -> handle_info({timeout, _TRef, batch_delete}, State = #state{batch = Batch}) ->
_ = del_direct_routes(Batch#batch.pending), _ = del_direct_routes(sets:to_list(Batch#batch.pending)),
{noreply, ensure_batch_timer(State#state{batch = ?BATCH(true, sets:new())}), hibernate}; {noreply, ensure_batch_timer(State#state{batch = ?BATCH(true, sets:new())}), hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->

View File

@ -53,7 +53,8 @@ mnesia(boot) ->
{type, set}, {type, set},
{ram_copies, [node()]}, {ram_copies, [node()]},
{record_name, routing_node}, {record_name, routing_node},
{attributes, record_info(fields, routing_node)}]); {attributes, record_info(fields, routing_node)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]);
mnesia(copy) -> mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ROUTING_NODE). ok = ekka_mnesia:copy_table(?ROUTING_NODE).

View File

@ -421,7 +421,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From,
{ok, ensure_await_rel_timer(State1)} {ok, ensure_await_rel_timer(State1)}
end; end;
true -> true ->
emqx_metrics:inc('messages/qos2/dropped'), emqx_metrics:trans(inc, 'messages/qos2/dropped'),
?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State), ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State),
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
end); end);
@ -432,7 +432,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In
true -> true ->
{ok, acked(pubrec, PacketId, State)}; {ok, acked(pubrec, PacketId, State)};
false -> false ->
emqx_metrics:inc('packets/pubrec/missed'), emqx_metrics:trans(inc, 'packets/pubrec/missed'),
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State), ?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end); end);
@ -443,7 +443,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel
{_Ts, AwaitingRel1} -> {_Ts, AwaitingRel1} ->
{ok, State#state{awaiting_rel = AwaitingRel1}}; {ok, State#state{awaiting_rel = AwaitingRel1}};
error -> error ->
emqx_metrics:inc('packets/pubrel/missed'), emqx_metrics:trans(inc, 'packets/pubrel/missed'),
?LOG(warning, "Cannot find PUBREL: ~w", [PacketId], State), ?LOG(warning, "Cannot find PUBREL: ~w", [PacketId], State),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end); end);
@ -502,7 +502,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
noreply(dequeue(acked(puback, PacketId, State))); noreply(dequeue(acked(puback, PacketId, State)));
false -> false ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State), ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State),
emqx_metrics:inc('packets/puback/missed'), emqx_metrics:trans(inc, 'packets/puback/missed'),
{noreply, State} {noreply, State}
end; end;
@ -513,7 +513,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
noreply(dequeue(acked(pubcomp, PacketId, State))); noreply(dequeue(acked(pubcomp, PacketId, State)));
false -> false ->
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State), ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State),
emqx_metrics:inc('packets/pubcomp/missed'), emqx_metrics:trans(inc, 'packets/pubcomp/missed'),
{noreply, State} {noreply, State}
end; end;
@ -603,6 +603,7 @@ handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer
handle_info({timeout, Timer, emit_stats}, handle_info({timeout, Timer, emit_stats},
State = #state{client_id = ClientId, State = #state{client_id = ClientId,
stats_timer = Timer}) -> stats_timer = Timer}) ->
emqx_metrics:commit(),
_ = emqx_sm:set_session_stats(ClientId, stats(State)), _ = emqx_sm:set_session_stats(ClientId, stats(State)),
NewState = State#state{stats_timer = undefined}, NewState = State#state{stats_timer = undefined},
Limits = erlang:get(force_shutdown_policy), Limits = erlang:get(force_shutdown_policy),
@ -734,7 +735,7 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
{publish, {PacketId, Msg}} -> {publish, {PacketId, Msg}} ->
case emqx_message:is_expired(Msg) of case emqx_message:is_expired(Msg) of
true -> true ->
emqx_metrics:inc('messages/expired'), emqx_metrics:trans(inc, 'messages/expired'),
emqx_inflight:delete(PacketId, Inflight); emqx_inflight:delete(PacketId, Inflight);
false -> false ->
redeliver({PacketId, Msg}, State), redeliver({PacketId, Msg}, State),
@ -774,7 +775,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now,
State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
case (timer:now_diff(Now, Ts) div 1000) of case (timer:now_diff(Now, Ts) div 1000) of
Age when Age >= Timeout -> Age when Age >= Timeout ->
emqx_metrics:inc('messages/qos2/expired'), emqx_metrics:trans(inc, 'messages/qos2/expired'),
?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId], State), ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId], State),
expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
Age -> Age ->

View File

@ -69,9 +69,11 @@ init([]) ->
{type, bag}, {type, bag},
{ram_copies, [node()]}, {ram_copies, [node()]},
{record_name, global_session}, {record_name, global_session},
{attributes, record_info(fields, global_session)}]), {attributes, record_info(fields, global_session)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]),
ok = ekka_mnesia:copy_table(?TAB), ok = ekka_mnesia:copy_table(?TAB),
ekka:monitor(membership), _ = ekka:monitor(membership),
{ok, #{}}. {ok, #{}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->

View File

@ -36,16 +36,21 @@
%% @doc Create or replicate trie tables. %% @doc Create or replicate trie tables.
-spec(mnesia(boot | copy) -> ok). -spec(mnesia(boot | copy) -> ok).
mnesia(boot) -> mnesia(boot) ->
%% Optimize
StoreProps = [{ets, [{read_concurrency, true},
{write_concurrency, true}]}],
%% Trie table %% Trie table
ok = ekka_mnesia:create_table(?TRIE, [ ok = ekka_mnesia:create_table(?TRIE, [
{ram_copies, [node()]}, {ram_copies, [node()]},
{record_name, trie}, {record_name, trie},
{attributes, record_info(fields, trie)}]), {attributes, record_info(fields, trie)},
{storage_properties, StoreProps}]),
%% Trie node table %% Trie node table
ok = ekka_mnesia:create_table(?TRIE_NODE, [ ok = ekka_mnesia:create_table(?TRIE_NODE, [
{ram_copies, [node()]}, {ram_copies, [node()]},
{record_name, trie_node}, {record_name, trie_node},
{attributes, record_info(fields, trie_node)}]); {attributes, record_info(fields, trie_node)},
{storage_properties, StoreProps}]);
mnesia(copy) -> mnesia(copy) ->
%% Copy trie table %% Copy trie table

View File

@ -148,7 +148,7 @@ send_fun(WsPid) ->
fun(Packet, Options) -> fun(Packet, Options) ->
Data = emqx_frame:serialize(Packet, Options), Data = emqx_frame:serialize(Packet, Options),
BinSize = iolist_size(Data), BinSize = iolist_size(Data),
emqx_metrics:inc('bytes/sent', BinSize), emqx_metrics:trans(inc, 'bytes/sent', BinSize),
put(send_oct, get(send_oct) + BinSize), put(send_oct, get(send_oct) + BinSize),
put(send_cnt, get(send_cnt) + 1), put(send_cnt, get(send_cnt) + 1),
WsPid ! {binary, iolist_to_binary(Data)}, WsPid ! {binary, iolist_to_binary(Data)},
@ -167,7 +167,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
BinSize = iolist_size(Data), BinSize = iolist_size(Data),
put(recv_oct, get(recv_oct) + BinSize), put(recv_oct, get(recv_oct) + BinSize),
?LOG(debug, "RECV ~p", [Data]), ?LOG(debug, "RECV ~p", [Data]),
emqx_metrics:inc('bytes/received', BinSize), emqx_metrics:trans(inc, 'bytes/received', BinSize),
case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of
{more, NewParserState} -> {more, NewParserState} ->
{ok, State#state{parser_state = NewParserState}}; {ok, State#state{parser_state = NewParserState}};
@ -223,6 +223,7 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
websocket_info({timeout, Timer, emit_stats}, websocket_info({timeout, Timer, emit_stats},
State = #state{stats_timer = Timer, proto_state = ProtoState}) -> State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
emqx_metrics:commit(),
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
{ok, State#state{stats_timer = undefined}, hibernate}; {ok, State#state{stats_timer = undefined}, hibernate};

50
test/emqx_batch_SUITE.erl Normal file
View File

@ -0,0 +1,50 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_batch_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() ->
[batch_full_commit, batch_linger_commit].
batch_full_commit(_) ->
B0 = emqx_batch:init(#{batch_size => 3, linger_ms => 2000, commit_fun => fun(_) -> ok end}),
B3 = lists:foldl(fun(E, B) -> emqx_batch:push(E, B) end, B0, [a, b, c]),
?assertEqual(3, emqx_batch:size(B3)),
?assertEqual([a, b, c], emqx_batch:items(B3)),
%% Trigger commit fun.
B4 = emqx_batch:push(a, B3),
?assertEqual(0, emqx_batch:size(B4)),
?assertEqual([], emqx_batch:items(B4)).
batch_linger_commit(_) ->
CommitFun = fun(Q) -> ?assertEqual(3, length(Q)) end,
B0 = emqx_batch:init(#{batch_size => 3, linger_ms => 500, commit_fun => CommitFun}),
B3 = lists:foldl(fun(E, B) -> emqx_batch:push(E, B) end, B0, [a, b, c]),
?assertEqual(3, emqx_batch:size(B3)),
?assertEqual([a, b, c], emqx_batch:items(B3)),
receive
batch_linger_expired ->
B4 = emqx_batch:commit(B3),
?assertEqual(0, emqx_batch:size(B4)),
?assertEqual([], emqx_batch:items(B4))
after
1000 ->
error(linger_timer_not_triggered)
end.

View File

@ -38,6 +38,7 @@ groups() ->
publish, pubsub, publish, pubsub,
t_local_subscribe, t_local_subscribe,
t_shared_subscribe, t_shared_subscribe,
dispatch_with_no_sub,
'pubsub#', 'pubsub+']}, 'pubsub#', 'pubsub+']},
{session, [sequence], [start_session]}, {session, [sequence], [start_session]},
{metrics, [sequence], [inc_dec_metric]}, {metrics, [sequence], [inc_dec_metric]},
@ -76,6 +77,11 @@ publish(_) ->
emqx:publish(Msg), emqx:publish(Msg),
?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end). ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
dispatch_with_no_sub(_) ->
Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>),
Delivery = #delivery{sender = self(), message = Msg, results = []},
?assertEqual(Delivery, emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)).
pubsub(_) -> pubsub(_) ->
true = emqx:is_running(node()), true = emqx:is_running(node()),
Self = self(), Self = self(),
@ -193,4 +199,3 @@ set_alarms(_) ->
?assertEqual(1, length(Alarms)), ?assertEqual(1, length(Alarms)),
emqx_alarm_mgr:clear_alarm(<<"1">>), emqx_alarm_mgr:clear_alarm(<<"1">>),
[] = emqx_alarm_mgr:get_alarms(). [] = emqx_alarm_mgr:get_alarms().

View File

@ -19,21 +19,40 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
all() -> [t_inc_dec_metrics]. all() -> [t_inc_dec_metrics, t_trans].
t_inc_dec_metrics(_) -> t_inc_dec_metrics(_) ->
{ok, _} = emqx_metrics:start_link(), {ok, _} = emqx_metrics:start_link(),
{0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')}, {0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
emqx_metrics:inc('bytes/received'), emqx_metrics:inc('bytes/received'),
emqx_metrics:inc({counter, 'bytes/received'}, 2), emqx_metrics:inc({counter, 'bytes/received'}, 2),
emqx_metrics:inc(counter, 'bytes/received', 2), emqx_metrics:inc(counter, 'bytes/received', 1),
emqx_metrics:inc('bytes/received', 1),
emqx_metrics:inc({gauge, 'messages/retained'}, 2), emqx_metrics:inc({gauge, 'messages/retained'}, 2),
emqx_metrics:inc(gauge, 'messages/retained', 2), emqx_metrics:inc(gauge, 'messages/retained', 2),
{5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')}, {5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
emqx_metrics:dec(gauge, 'messages/retained'), emqx_metrics:dec(gauge, 'messages/retained'),
emqx_metrics:dec(gauge, 'messages/retained', 1), emqx_metrics:dec(gauge, 'messages/retained', 1),
2 = emqx_metrics:val('messages/retained'), 2 = emqx_metrics:val('messages/retained'),
emqx_metrics:set('messages/retained', 3),
3 = emqx_metrics:val('messages/retained'),
emqx_metrics:received(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}), emqx_metrics:received(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}),
{1, 1} = {emqx_metrics:val('packets/received'), emqx_metrics:val('packets/connect')}, {1, 1} = {emqx_metrics:val('packets/received'), emqx_metrics:val('packets/connect')},
emqx_metrics:sent(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}}), emqx_metrics:sent(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}}),
{1, 1} = {emqx_metrics:val('packets/sent'), emqx_metrics:val('packets/connack')}. {1, 1} = {emqx_metrics:val('packets/sent'), emqx_metrics:val('packets/connack')}.
t_trans(_) ->
{ok, _} = emqx_metrics:start_link(),
emqx_metrics:trans(inc, 'bytes/received'),
emqx_metrics:trans(inc, {counter, 'bytes/received'}, 2),
emqx_metrics:trans(inc, counter, 'bytes/received', 2),
emqx_metrics:trans(inc, {gauge, 'messages/retained'}, 2),
emqx_metrics:trans(inc, gauge, 'messages/retained', 2),
{0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
emqx_metrics:commit(),
{5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
emqx_metrics:trans(dec, gauge, 'messages/retained'),
emqx_metrics:trans(dec, gauge, 'messages/retained', 1),
4 = emqx_metrics:val('messages/retained'),
emqx_metrics:commit(),
2 = emqx_metrics:val('messages/retained').

View File

@ -64,7 +64,7 @@ add_del_route(_) ->
?R:del_route(From, <<"a/b/c">>, node()), ?R:del_route(From, <<"a/b/c">>, node()),
?R:del_route(From, <<"a/+/b">>, node()), ?R:del_route(From, <<"a/+/b">>, node()),
timer:sleep(1), timer:sleep(120),
?assertEqual([], lists:sort(?R:topics())). ?assertEqual([], lists:sort(?R:topics())).
match_routes(_) -> match_routes(_) ->

View File

@ -45,6 +45,7 @@ ignore_loop(_Config) ->
application:set_env(emqx, mqtt_ignore_loop_deliver, false). application:set_env(emqx, mqtt_ignore_loop_deliver, false).
t_session_all(_) -> t_session_all(_) ->
emqx_zone:set_env(internal, idle_timeout, 100),
ClientId = <<"ClientId">>, ClientId = <<"ClientId">>,
{ok, ConnPid} = emqx_mock_client:start_link(ClientId), {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
{ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),