Merge remote-tracking branch 'origin/develop'
This commit is contained in:
commit
b20e87f98e
2
Makefile
2
Makefile
|
@ -37,7 +37,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
|
|||
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
|
||||
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
|
||||
emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
|
||||
emqx_packet emqx_connection emqx_tracer emqx_sys_mon
|
||||
emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message
|
||||
|
||||
CT_NODE_NAME = emqxct@127.0.0.1
|
||||
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
linger_ms => pos_integer(),
|
||||
commit_fun := function()
|
||||
}).
|
||||
|
||||
-export_type([options/0]).
|
||||
|
||||
-record(batch, {
|
||||
|
@ -31,7 +32,9 @@
|
|||
linger_timer :: reference() | undefined,
|
||||
commit_fun :: function()
|
||||
}).
|
||||
-type(batch() :: #batch{}).
|
||||
|
||||
-opaque(batch() :: #batch{}).
|
||||
|
||||
-export_type([batch/0]).
|
||||
|
||||
-spec(init(options()) -> batch()).
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
-type(options() :: #{max_packet_size => 1..?MAX_PACKET_SIZE,
|
||||
version => emqx_mqtt_types:version()}).
|
||||
|
||||
-type(parse_state() :: {none, options()} | cont_fun(binary())).
|
||||
-opaque(parse_state() :: {none, options()} | cont_fun(binary())).
|
||||
|
||||
-type(cont_fun(Bin) :: fun((Bin) -> {ok, emqx_mqtt_types:packet(), binary()}
|
||||
| {more, cont_fun(Bin)})).
|
||||
|
|
|
@ -31,7 +31,11 @@
|
|||
-type(st() :: #{cnt => {integer(), integer()},
|
||||
oct => {integer(), integer()}}).
|
||||
|
||||
-type(gc_state() :: {?MODULE, st()}).
|
||||
-opaque(gc_state() :: {?MODULE, st()}).
|
||||
|
||||
-export_type([gc_state/0]).
|
||||
|
||||
-define(GCS(St), {?MODULE, St}).
|
||||
|
||||
-define(disabled, disabled).
|
||||
-define(ENABLED(X), (is_integer(X) andalso X > 0)).
|
||||
|
@ -41,14 +45,15 @@
|
|||
init(#{count := Count, bytes := Bytes}) ->
|
||||
Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)],
|
||||
Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)],
|
||||
{?MODULE, maps:from_list(Cnt ++ Oct)};
|
||||
?GCS(maps:from_list(Cnt ++ Oct));
|
||||
init(false) -> undefined.
|
||||
|
||||
%% @doc Try to run GC based on reduntions of count or bytes.
|
||||
-spec(run(pos_integer(), pos_integer(), gc_state()) -> {boolean(), gc_state()}).
|
||||
run(Cnt, Oct, {?MODULE, St}) ->
|
||||
-spec(run(pos_integer(), pos_integer(), gc_state())
|
||||
-> {boolean(), gc_state()}).
|
||||
run(Cnt, Oct, ?GCS(St)) ->
|
||||
{Res, St1} = run([{cnt, Cnt}, {oct, Oct}], St),
|
||||
{Res, {?MODULE, St1}};
|
||||
{Res, ?GCS(St1)};
|
||||
run(_Cnt, _Oct, undefined) ->
|
||||
{false, undefined}.
|
||||
|
||||
|
@ -64,15 +69,15 @@ run([{K, N}|T], St) ->
|
|||
|
||||
%% @doc Info of GC state.
|
||||
-spec(info(gc_state()) -> maybe(map())).
|
||||
info({?MODULE, St}) ->
|
||||
info(?GCS(St)) ->
|
||||
St;
|
||||
info(undefined) ->
|
||||
undefined.
|
||||
|
||||
%% @doc Reset counters to zero.
|
||||
-spec(reset(gc_state()) -> gc_state()).
|
||||
reset({?MODULE, St}) ->
|
||||
{?MODULE, do_reset(St)};
|
||||
reset(?GCS(St)) ->
|
||||
?GCS(do_reset(St));
|
||||
reset(undefined) ->
|
||||
undefined.
|
||||
|
||||
|
|
|
@ -14,11 +14,15 @@
|
|||
|
||||
-module(emqx_inflight).
|
||||
|
||||
-export([new/1, contain/2, lookup/2, insert/3, update/3, update_size/2, delete/2, values/1,
|
||||
to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]).
|
||||
-export([new/1, contain/2, lookup/2, insert/3, update/3, update_size/2, delete/2,
|
||||
values/1, to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]).
|
||||
|
||||
-type(key() :: term()).
|
||||
-type(max_size() :: pos_integer()).
|
||||
-type(inflight() :: {?MODULE, max_size(), gb_trees:tree()}).
|
||||
-opaque(inflight() :: {?MODULE, max_size(), gb_trees:tree()}).
|
||||
|
||||
-define(Inflight(Tree), {?MODULE, _MaxSize, Tree}).
|
||||
-define(Inflight(MaxSize, Tree), {?MODULE, MaxSize, (Tree)}).
|
||||
|
||||
-export_type([inflight/0]).
|
||||
|
||||
|
@ -26,68 +30,68 @@
|
|||
new(MaxSize) when MaxSize >= 0 ->
|
||||
{?MODULE, MaxSize, gb_trees:empty()}.
|
||||
|
||||
-spec(contain(Key :: term(), inflight()) -> boolean()).
|
||||
contain(Key, {?MODULE, _MaxSize, Tree}) ->
|
||||
-spec(contain(key(), inflight()) -> boolean()).
|
||||
contain(Key, ?Inflight(Tree)) ->
|
||||
gb_trees:is_defined(Key, Tree).
|
||||
|
||||
-spec(lookup(Key :: term(), inflight()) -> {value, term()} | none).
|
||||
lookup(Key, {?MODULE, _MaxSize, Tree}) ->
|
||||
-spec(lookup(key(), inflight()) -> {value, term()} | none).
|
||||
lookup(Key, ?Inflight(Tree)) ->
|
||||
gb_trees:lookup(Key, Tree).
|
||||
|
||||
-spec(insert(Key :: term(), Value :: term(), inflight()) -> inflight()).
|
||||
insert(Key, Value, {?MODULE, MaxSize, Tree}) ->
|
||||
{?MODULE, MaxSize, gb_trees:insert(Key, Value, Tree)}.
|
||||
-spec(insert(key(), Val :: term(), inflight()) -> inflight()).
|
||||
insert(Key, Val, ?Inflight(MaxSize, Tree)) ->
|
||||
?Inflight(MaxSize, gb_trees:insert(Key, Val, Tree)).
|
||||
|
||||
-spec(delete(Key :: term(), inflight()) -> inflight()).
|
||||
delete(Key, {?MODULE, MaxSize, Tree}) ->
|
||||
{?MODULE, MaxSize, gb_trees:delete(Key, Tree)}.
|
||||
-spec(delete(key(), inflight()) -> inflight()).
|
||||
delete(Key, ?Inflight(MaxSize, Tree)) ->
|
||||
?Inflight(MaxSize, gb_trees:delete(Key, Tree)).
|
||||
|
||||
-spec(update(Key :: term(), Val :: term(), inflight()) -> inflight()).
|
||||
update(Key, Val, {?MODULE, MaxSize, Tree}) ->
|
||||
{?MODULE, MaxSize, gb_trees:update(Key, Val, Tree)}.
|
||||
-spec(update(key(), Val :: term(), inflight()) -> inflight()).
|
||||
update(Key, Val, ?Inflight(MaxSize, Tree)) ->
|
||||
?Inflight(MaxSize, gb_trees:update(Key, Val, Tree)).
|
||||
|
||||
-spec(update_size(integer(), inflight()) -> inflight()).
|
||||
update_size(MaxSize, {?MODULE, _OldMaxSize, Tree}) ->
|
||||
{?MODULE, MaxSize, Tree}.
|
||||
update_size(MaxSize, ?Inflight(Tree)) ->
|
||||
?Inflight(MaxSize, Tree).
|
||||
|
||||
-spec(is_full(inflight()) -> boolean()).
|
||||
is_full({?MODULE, 0, _Tree}) ->
|
||||
is_full(?Inflight(0, _Tree)) ->
|
||||
false;
|
||||
is_full({?MODULE, MaxSize, Tree}) ->
|
||||
is_full(?Inflight(MaxSize, Tree)) ->
|
||||
MaxSize =< gb_trees:size(Tree).
|
||||
|
||||
-spec(is_empty(inflight()) -> boolean()).
|
||||
is_empty({?MODULE, _MaxSize, Tree}) ->
|
||||
is_empty(?Inflight(Tree)) ->
|
||||
gb_trees:is_empty(Tree).
|
||||
|
||||
-spec(smallest(inflight()) -> {K :: term(), V :: term()}).
|
||||
smallest({?MODULE, _MaxSize, Tree}) ->
|
||||
-spec(smallest(inflight()) -> {key(), term()}).
|
||||
smallest(?Inflight(Tree)) ->
|
||||
gb_trees:smallest(Tree).
|
||||
|
||||
-spec(largest(inflight()) -> {K :: term(), V :: term()}).
|
||||
largest({?MODULE, _MaxSize, Tree}) ->
|
||||
-spec(largest(inflight()) -> {key(), term()}).
|
||||
largest(?Inflight(Tree)) ->
|
||||
gb_trees:largest(Tree).
|
||||
|
||||
-spec(values(inflight()) -> list()).
|
||||
values({?MODULE, _MaxSize, Tree}) ->
|
||||
values(?Inflight(Tree)) ->
|
||||
gb_trees:values(Tree).
|
||||
|
||||
-spec(to_list(inflight()) -> list({K :: term(), V :: term()})).
|
||||
to_list({?MODULE, _MaxSize, Tree}) ->
|
||||
-spec(to_list(inflight()) -> list({key(), term()})).
|
||||
to_list(?Inflight(Tree)) ->
|
||||
gb_trees:to_list(Tree).
|
||||
|
||||
-spec(window(inflight()) -> list()).
|
||||
window(Inflight = {?MODULE, _MaxSize, Tree}) ->
|
||||
window(Inflight = ?Inflight(Tree)) ->
|
||||
case gb_trees:is_empty(Tree) of
|
||||
true -> [];
|
||||
true -> [];
|
||||
false -> [Key || {Key, _Val} <- [smallest(Inflight), largest(Inflight)]]
|
||||
end.
|
||||
|
||||
-spec(size(inflight()) -> non_neg_integer()).
|
||||
size({?MODULE, _MaxSize, Tree}) ->
|
||||
size(?Inflight(Tree)) ->
|
||||
gb_trees:size(Tree).
|
||||
|
||||
-spec(max_size(inflight()) -> non_neg_integer()).
|
||||
max_size({?MODULE, MaxSize, _Tree}) ->
|
||||
max_size(?Inflight(MaxSize, _Tree)) ->
|
||||
MaxSize.
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
|
||||
-record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}).
|
||||
|
||||
-type(keepalive() :: #keepalive{}).
|
||||
-opaque(keepalive() :: #keepalive{}).
|
||||
|
||||
-export_type([keepalive/0]).
|
||||
|
||||
|
|
|
@ -48,11 +48,13 @@ make(From, QoS, Topic, Payload) ->
|
|||
payload = Payload,
|
||||
timestamp = os:timestamp()}.
|
||||
|
||||
-spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()).
|
||||
set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) ->
|
||||
Msg#message{flags = Flags};
|
||||
set_flags(New, Msg = #message{flags = Old}) when is_map(New) ->
|
||||
Msg#message{flags = maps:merge(Old, New)}.
|
||||
|
||||
-spec(get_flag(flag(), emqx_types:message()) -> boolean()).
|
||||
get_flag(Flag, Msg) ->
|
||||
get_flag(Flag, Msg, false).
|
||||
get_flag(Flag, #message{flags = Flags}, Default) ->
|
||||
|
@ -73,20 +75,26 @@ set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) ->
|
|||
|
||||
-spec(unset_flag(flag(), emqx_types:message()) -> emqx_types:message()).
|
||||
unset_flag(Flag, Msg = #message{flags = Flags}) ->
|
||||
Msg#message{flags = maps:remove(Flag, Flags)}.
|
||||
case maps:is_key(Flag, Flags) of
|
||||
true ->
|
||||
Msg#message{flags = maps:remove(Flag, Flags)};
|
||||
false -> Msg
|
||||
end.
|
||||
|
||||
-spec(set_headers(map(), emqx_types:message()) -> emqx_types:message()).
|
||||
set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) ->
|
||||
Msg#message{headers = Headers};
|
||||
set_headers(New, Msg = #message{headers = Old}) when is_map(New) ->
|
||||
Msg#message{headers = maps:merge(Old, New)};
|
||||
set_headers(_, Msg) ->
|
||||
Msg.
|
||||
Msg#message{headers = maps:merge(Old, New)}.
|
||||
|
||||
-spec(get_header(term(), emqx_types:message()) -> term()).
|
||||
get_header(Hdr, Msg) ->
|
||||
get_header(Hdr, Msg, undefined).
|
||||
-spec(get_header(term(), emqx_types:message(), Default :: term()) -> term()).
|
||||
get_header(Hdr, #message{headers = Headers}, Default) ->
|
||||
maps:get(Hdr, Headers, Default).
|
||||
|
||||
-spec(set_header(term(), term(), emqx_types:message()) -> emqx_types:message()).
|
||||
set_header(Hdr, Val, Msg = #message{headers = undefined}) ->
|
||||
Msg#message{headers = #{Hdr => Val}};
|
||||
set_header(Hdr, Val, Msg = #message{headers = Headers}) ->
|
||||
|
@ -98,13 +106,13 @@ is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestam
|
|||
is_expired(_Msg) ->
|
||||
false.
|
||||
|
||||
-spec(update_expiry(emqx_types:message()) -> emqx_types:message()).
|
||||
update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) ->
|
||||
case elapsed(CreatedAt) of
|
||||
Elapsed when Elapsed > 0 ->
|
||||
set_header('Message-Expiry-Interval', max(1, Interval - (Elapsed div 1000)), Msg);
|
||||
_ -> Msg
|
||||
end;
|
||||
|
||||
update_expiry(Msg) -> Msg.
|
||||
|
||||
remove_topic_alias(Msg = #message{headers = Headers}) ->
|
||||
|
|
|
@ -34,6 +34,8 @@
|
|||
-export([send/2]).
|
||||
-export([shutdown/2]).
|
||||
|
||||
-export_type([state/0]).
|
||||
|
||||
-record(pstate, {
|
||||
zone,
|
||||
sendfun,
|
||||
|
@ -66,8 +68,7 @@
|
|||
topic_alias_maximum
|
||||
}).
|
||||
|
||||
-type(state() :: #pstate{}).
|
||||
-export_type([state/0]).
|
||||
-opaque(state() :: #pstate{}).
|
||||
|
||||
-ifdef(TEST).
|
||||
-compile(export_all).
|
||||
|
|
|
@ -75,15 +75,19 @@ mnesia(copy) ->
|
|||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
-spec(subscribe(emqx_topic:group(), emqx_topic:topic(), pid()) -> ok).
|
||||
subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
||||
gen_server:call(?SERVER, {subscribe, Group, Topic, SubPid}).
|
||||
|
||||
-spec(unsubscribe(emqx_topic:group(), emqx_topic:topic(), pid()) -> ok).
|
||||
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
||||
gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
|
||||
|
||||
record(Group, Topic, SubPid) ->
|
||||
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
|
||||
|
||||
-spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery())
|
||||
-> emqx_types:delivery()).
|
||||
dispatch(Group, Topic, Delivery) ->
|
||||
dispatch(Group, Topic, Delivery, _FailedSubs = []).
|
||||
|
||||
|
@ -173,12 +177,12 @@ get_ack_ref(Msg) ->
|
|||
-spec(is_ack_required(emqx_types:message()) -> boolean()).
|
||||
is_ack_required(Msg) -> ?no_ack =/= get_ack_ref(Msg).
|
||||
|
||||
%% @doc Negative ack dropped message due to message queue being full.
|
||||
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
||||
-spec(maybe_nack_dropped(emqx_types:message()) -> ok).
|
||||
maybe_nack_dropped(Msg) ->
|
||||
case get_ack_ref(Msg) of
|
||||
?no_ack -> ok;
|
||||
{Sender, Ref} -> nack(Sender, Ref, drpped)
|
||||
{Sender, Ref} -> nack(Sender, Ref, dropped)
|
||||
end.
|
||||
|
||||
%% @doc Negative ack message due to connection down.
|
||||
|
@ -325,9 +329,9 @@ terminate(_Reason, _State) ->
|
|||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
%% keep track of alive remote pids
|
||||
maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok;
|
||||
|
|
|
@ -25,12 +25,13 @@
|
|||
-export([systop/1]).
|
||||
-export([parse/1, parse/2]).
|
||||
|
||||
-type(group() :: binary()).
|
||||
-type(topic() :: binary()).
|
||||
-type(word() :: '' | '+' | '#' | binary()).
|
||||
-type(words() :: list(word())).
|
||||
-type(triple() :: {root | binary(), word(), binary()}).
|
||||
-opaque(triple() :: {root | binary(), word(), binary()}).
|
||||
|
||||
-export_type([topic/0, word/0, triple/0]).
|
||||
-export_type([group/0, topic/0, word/0, triple/0]).
|
||||
|
||||
-define(MAX_TOPIC_LEN, 4096).
|
||||
|
||||
|
|
|
@ -33,11 +33,13 @@ all() ->
|
|||
].
|
||||
|
||||
message_make(_) ->
|
||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||
Msg = emqx_message:make(<<"clientid">>, <<"payload">>),
|
||||
?assertEqual(0, Msg#message.qos),
|
||||
Msg1 = emqx_message:make(<<"clientid">>, ?QOS_2, <<"topic">>, <<"payload">>),
|
||||
?assert(is_binary(Msg1#message.id)),
|
||||
?assertEqual(2, Msg1#message.qos).
|
||||
Msg1 = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||
?assertEqual(0, Msg1#message.qos),
|
||||
Msg2 = emqx_message:make(<<"clientid">>, ?QOS_2, <<"topic">>, <<"payload">>),
|
||||
?assert(is_binary(Msg2#message.id)),
|
||||
?assertEqual(2, Msg2#message.qos).
|
||||
|
||||
message_flag(_) ->
|
||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||
|
|
Loading…
Reference in New Issue