commit
1256ce1500
|
@ -8,8 +8,10 @@ before_install:
|
|||
|
||||
script:
|
||||
- make dep-vsn-check
|
||||
- make rebar-compile
|
||||
- make rebar-eunit
|
||||
- make rebar-ct
|
||||
- make rebar-cover
|
||||
- make coveralls
|
||||
|
||||
sudo: false
|
||||
|
|
13
Makefile
13
Makefile
|
@ -18,7 +18,7 @@ dep_lager_syslog = git https://github.com/basho/lager_syslog 3.0.1
|
|||
|
||||
NO_AUTOPATCH = cuttlefish
|
||||
|
||||
ERLC_OPTS += +debug_info
|
||||
ERLC_OPTS += +debug_info -DAPPLICATION=emqx
|
||||
ERLC_OPTS += +'{parse_transform, lager_transform}'
|
||||
|
||||
BUILD_DEPS = cuttlefish
|
||||
|
@ -27,7 +27,7 @@ dep_cuttlefish = git https://github.com/emqx/cuttlefish emqx30
|
|||
#TEST_DEPS = emqx_ct_helplers
|
||||
#dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers
|
||||
|
||||
TEST_ERLC_OPTS += +debug_info
|
||||
TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx
|
||||
TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
|
||||
|
||||
EUNIT_OPTS = verbose
|
||||
|
@ -39,7 +39,7 @@ CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access
|
|||
emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \
|
||||
emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
|
||||
emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \
|
||||
emqx_mountpoint emqx_listeners emqx_protocol
|
||||
emqx_mountpoint emqx_listeners emqx_protocol emqx_pool
|
||||
|
||||
CT_NODE_NAME = emqxct@127.0.0.1
|
||||
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
|
||||
|
@ -60,7 +60,7 @@ gen-clean:
|
|||
@rm -f etc/gen.emqx.conf
|
||||
|
||||
bbmustache:
|
||||
$(verbose) git clone https://github.com/soranoba/bbmustache.git && pushd bbmustache && ./rebar3 compile && popd
|
||||
$(verbose) git clone https://github.com/soranoba/bbmustache.git && cd bbmustache && ./rebar3 compile && cd ..
|
||||
|
||||
# This hack is to generate a conf file for testing
|
||||
# relx overlay is used for release
|
||||
|
@ -78,6 +78,9 @@ app.config: etc/gen.emqx.conf
|
|||
|
||||
ct: cuttlefish app.config
|
||||
|
||||
rebar-cover:
|
||||
@rebar3 cover
|
||||
|
||||
coveralls:
|
||||
@rebar3 coveralls send
|
||||
|
||||
|
@ -91,7 +94,7 @@ rebar-cuttlefish: rebar-deps
|
|||
rebar-deps:
|
||||
@rebar3 get-deps
|
||||
|
||||
rebar-eunit:
|
||||
rebar-eunit: rebar-cuttlefish
|
||||
@rebar3 eunit
|
||||
|
||||
rebar-compile:
|
||||
|
|
10
rebar.config
10
rebar.config
|
@ -20,7 +20,8 @@
|
|||
warn_unused_import,
|
||||
warn_obsolete_guard,
|
||||
debug_info,
|
||||
{parse_transform, lager_transform}]}.
|
||||
{parse_transform, lager_transform},
|
||||
{d, 'APPLICATION', emqx}]}.
|
||||
{xref_checks, [undefined_function_calls, undefined_functions,
|
||||
locals_not_used, deprecated_function_calls,
|
||||
warnings_as_errors, deprecated_functions]}.
|
||||
|
@ -29,10 +30,5 @@
|
|||
{cover_export_enabled, true}.
|
||||
|
||||
%% rebar3_neotoma_plugin is needed to compile the .peg file for cuttlefish
|
||||
{plugins, [rebar3_neotoma_plugin]}.
|
||||
{plugins, [coveralls, rebar3_neotoma_plugin]}.
|
||||
|
||||
%% Do not include cuttlefish's dependencies as mine
|
||||
%% its dependencies are only fetched to compile itself
|
||||
%% they are however not needed by emqx
|
||||
{overrides, [{override, cuttlefish, [{deps, []}]}
|
||||
]}.
|
||||
|
|
|
@ -3,7 +3,6 @@ CONFIG1 = case os:getenv("TRAVIS") of
|
|||
"true" ->
|
||||
JobId = os:getenv("TRAVIS_JOB_ID"),
|
||||
[{coveralls_service_job_id, JobId},
|
||||
{plugins, [coveralls]},
|
||||
{coveralls_coverdata, "_build/test/cover/*.coverdata"},
|
||||
{coveralls_service_name , "travis-ci"} | CONFIG];
|
||||
_ ->
|
||||
|
|
|
@ -303,7 +303,7 @@ handle_packet(Data, State = #state{proto_state = ProtoState,
|
|||
idle_timeout = IdleTimeout}) ->
|
||||
case catch emqx_frame:parse(Data, ParserState) of
|
||||
{more, NewParserState} ->
|
||||
{noreply, State#state{parser_state = NewParserState}, IdleTimeout};
|
||||
{noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout};
|
||||
{ok, Packet = ?PACKET(Type), Rest} ->
|
||||
emqx_metrics:received(Packet),
|
||||
case emqx_protocol:received(Packet, ProtoState) of
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
-export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]).
|
||||
-export([set_headers/2]).
|
||||
-export([get_header/2, get_header/3, set_header/3]).
|
||||
-export([is_expired/1, check_expiry/1, check_expiry/2, update_expiry/1]).
|
||||
-export([is_expired/1, update_expiry/1]).
|
||||
-export([format/1]).
|
||||
|
||||
-type(flag() :: atom()).
|
||||
|
@ -100,21 +100,6 @@ is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestam
|
|||
is_expired(_Msg) ->
|
||||
false.
|
||||
|
||||
-spec(check_expiry(emqx_types:message()) -> {ok, pos_integer()} | expired | false).
|
||||
check_expiry(Msg = #message{timestamp = CreatedAt}) ->
|
||||
check_expiry(Msg, CreatedAt);
|
||||
check_expiry(_Msg) ->
|
||||
false.
|
||||
|
||||
-spec(check_expiry(emqx_types:message(), erlang:timestamp()) -> {ok, pos_integer()} | expired | false).
|
||||
check_expiry(#message{headers = #{'Message-Expiry-Interval' := Interval}}, Since) ->
|
||||
case Interval - (elapsed(Since) div 1000) of
|
||||
Timeout when Timeout > 0 -> {ok, Timeout};
|
||||
_ -> expired
|
||||
end;
|
||||
check_expiry(_Msg, _Since) ->
|
||||
false.
|
||||
|
||||
update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) ->
|
||||
case elapsed(CreatedAt) of
|
||||
Elapsed when Elapsed > 0 ->
|
||||
|
@ -138,4 +123,3 @@ format(flags, Flags) ->
|
|||
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
|
||||
format(headers, Headers) ->
|
||||
io_lib:format("~p", [Headers]).
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
%%
|
||||
%% This module implements a simple in-memory queue for MQTT persistent session.
|
||||
%%
|
||||
%% If the broker restarted or crashed, all the messages queued will be gone.
|
||||
%% If the broker restarts or crashes, all queued messages will be lost.
|
||||
%%
|
||||
%% Concept of Message Queue and Inflight Window:
|
||||
%%
|
||||
|
@ -29,12 +29,15 @@
|
|||
%% |<--- Win Size --->|
|
||||
%%
|
||||
%%
|
||||
%% 1. Inflight Window to store the messages delivered and awaiting for puback.
|
||||
%% 1. Inflight Window is to store the messages
|
||||
%% that are delivered but still awaiting for puback.
|
||||
%%
|
||||
%% 2. Enqueue messages when the inflight window is full.
|
||||
%% 2. Messages are enqueued to tail when the inflight window is full.
|
||||
%%
|
||||
%% 3. If the queue is full, dropped qos0 messages if store_qos0 is true,
|
||||
%% otherwise dropped the oldest one.
|
||||
%% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true`
|
||||
%% in init options
|
||||
%%
|
||||
%% 4. If the queue is full drop the oldest one unless `max_len' is set to `0'.
|
||||
%%
|
||||
%% @end
|
||||
|
||||
|
|
|
@ -17,7 +17,8 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-export([start_link/0, start_link/2]).
|
||||
-export([submit/1, async_submit/1]).
|
||||
-export([submit/1, submit/2]).
|
||||
-export([async_submit/1, async_submit/2]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||
|
@ -25,6 +26,8 @@
|
|||
|
||||
-define(POOL, ?MODULE).
|
||||
|
||||
-type(task() :: fun() | mfa() | {fun(), Args :: list(any())}).
|
||||
|
||||
%% @doc Start pooler supervisor.
|
||||
start_link() ->
|
||||
emqx_pool_sup:start_link(?POOL, random, {?MODULE, start_link, []}).
|
||||
|
@ -34,18 +37,35 @@ start_link() ->
|
|||
start_link(Pool, Id) ->
|
||||
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
|
||||
|
||||
%% @doc Submit work to the pool
|
||||
-spec(submit(fun()) -> any()).
|
||||
submit(Fun) ->
|
||||
gen_server:call(worker(), {submit, Fun}, infinity).
|
||||
%% @doc Submit work to the pool.
|
||||
-spec(submit(task()) -> any()).
|
||||
submit(Task) ->
|
||||
call({submit, Task}).
|
||||
|
||||
%% @doc Submit work to the pool asynchronously
|
||||
-spec(async_submit(fun()) -> ok).
|
||||
async_submit(Fun) ->
|
||||
gen_server:cast(worker(), {async_submit, Fun}).
|
||||
-spec(submit(fun(), list(any())) -> any()).
|
||||
submit(Fun, Args) ->
|
||||
call({submit, {Fun, Args}}).
|
||||
|
||||
%% @private
|
||||
call(Req) ->
|
||||
gen_server:call(worker(), Req, infinity).
|
||||
|
||||
%% @doc Submit work to the pool asynchronously.
|
||||
-spec(async_submit(task()) -> ok).
|
||||
async_submit(Task) ->
|
||||
cast({async_submit, Task}).
|
||||
|
||||
-spec(async_submit(fun(), list(any())) -> ok).
|
||||
async_submit(Fun, Args) ->
|
||||
cast({async_submit, {Fun, Args}}).
|
||||
|
||||
%% @private
|
||||
cast(Msg) ->
|
||||
gen_server:cast(worker(), Msg).
|
||||
|
||||
%% @private
|
||||
worker() ->
|
||||
gproc_pool:pick_worker(pool).
|
||||
gproc_pool:pick_worker(?POOL).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
|
@ -55,15 +75,15 @@ init([Pool, Id]) ->
|
|||
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||
{ok, #{pool => Pool, id => Id}}.
|
||||
|
||||
handle_call({submit, Fun}, _From, State) ->
|
||||
{reply, catch run(Fun), State};
|
||||
handle_call({submit, Task}, _From, State) ->
|
||||
{reply, catch run(Task), State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[Pool] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({async_submit, Fun}, State) ->
|
||||
try run(Fun)
|
||||
handle_cast({async_submit, Task}, State) ->
|
||||
try run(Task)
|
||||
catch _:Error:Stacktrace ->
|
||||
emqx_logger:error("[Pool] error: ~p, ~p", [Error, Stacktrace])
|
||||
end,
|
||||
|
@ -78,7 +98,7 @@ handle_info(Info, State) ->
|
|||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #{pool := Pool, id := Id}) ->
|
||||
true = gproc_pool:disconnect_worker(Pool, {Pool, Id}).
|
||||
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
@ -89,6 +109,8 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
run({M, F, A}) ->
|
||||
erlang:apply(M, F, A);
|
||||
run({F, A}) when is_function(F), is_list(A) ->
|
||||
erlang:apply(F, A);
|
||||
run(Fun) when is_function(Fun) ->
|
||||
Fun().
|
||||
|
||||
|
|
|
@ -49,8 +49,18 @@
|
|||
|
||||
-define(PUBPACKET, ?PUBLISH_PACKET(?PUBQOS, <<"sub/topic">>, ?PACKETID, <<"publish">>)).
|
||||
|
||||
-define(PAYLOAD, [{type,"dsmSimulationData"},
|
||||
{id, 9999},
|
||||
{status, "running"},
|
||||
{soc, 1536702170},
|
||||
{fracsec, 451000},
|
||||
{data, lists:seq(1, 20480)}]).
|
||||
|
||||
-define(BIG_PUBPACKET, ?PUBLISH_PACKET(?PUBQOS, <<"sub/topic">>, ?PACKETID, emqx_json:encode(?PAYLOAD))).
|
||||
|
||||
all() ->
|
||||
[{group, connect}].
|
||||
[{group, connect},
|
||||
{group, publish}].
|
||||
|
||||
groups() ->
|
||||
[{connect, [non_parallel_tests],
|
||||
|
@ -60,6 +70,10 @@ groups() ->
|
|||
mqtt_connect_with_ssl_oneway,
|
||||
mqtt_connect_with_ssl_twoway,
|
||||
mqtt_connect_with_ws
|
||||
]},
|
||||
{publish, [non_parallel_tests],
|
||||
[
|
||||
packet_size
|
||||
]}].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
|
@ -157,6 +171,21 @@ mqtt_connect_with_ws(_Config) ->
|
|||
{close, _} = rfc6455_client:close(WS),
|
||||
ok.
|
||||
|
||||
%%issue 1811
|
||||
packet_size(_Config) ->
|
||||
{ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000),
|
||||
Packet = raw_send_serialise(?CLIENT),
|
||||
emqx_client_sock:send(Sock, Packet),
|
||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data),
|
||||
|
||||
%% Pub Packet QoS 1
|
||||
PubPacket = raw_send_serialise(?BIG_PUBPACKET),
|
||||
emqx_client_sock:send(Sock, PubPacket),
|
||||
{ok, Data1} = gen_tcp:recv(Sock, 0),
|
||||
{ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(Data1),
|
||||
emqx_client_sock:close(Sock).
|
||||
|
||||
raw_send_serialise(Packet) ->
|
||||
emqx_frame:serialize(Packet).
|
||||
|
||||
|
|
|
@ -68,11 +68,8 @@ message_expired(_) ->
|
|||
Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg),
|
||||
timer:sleep(500),
|
||||
?assertNot(emqx_message:is_expired(Msg1)),
|
||||
{ok, 1} = emqx_message:check_expiry(Msg1),
|
||||
timer:sleep(600),
|
||||
?assert(emqx_message:is_expired(Msg1)),
|
||||
expired = emqx_message:check_expiry(Msg1),
|
||||
timer:sleep(1000),
|
||||
Msg2 = emqx_message:update_expiry(Msg1),
|
||||
?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)).
|
||||
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
|
||||
-module(emqx_pool_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> [
|
||||
{group, submit_case},
|
||||
{group, async_submit_case}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
[
|
||||
{submit_case, [sequence], [submit_mfa, submit_fa]},
|
||||
{async_submit_case, [sequence], [async_submit_mfa]}
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
application:ensure_all_started(gproc),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok.
|
||||
|
||||
submit_mfa(_Config) ->
|
||||
erlang:process_flag(trap_exit, true),
|
||||
{ok, Pid} = emqx_pool:start_link(),
|
||||
Result = emqx_pool:submit({?MODULE, test_mfa, []}),
|
||||
?assertEqual(15, Result),
|
||||
gen_server:stop(Pid, normal, 3000),
|
||||
ok.
|
||||
|
||||
submit_fa(_Config) ->
|
||||
{ok, Pid} = emqx_pool:start_link(),
|
||||
Fun = fun(X) -> case X rem 2 of 0 -> {true, X div 2}; _ -> false end end,
|
||||
Result = emqx_pool:submit(Fun, [2]),
|
||||
?assertEqual({true, 1}, Result),
|
||||
exit(Pid, normal).
|
||||
|
||||
test_mfa() ->
|
||||
lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]).
|
||||
|
||||
async_submit_mfa(_Config) ->
|
||||
{ok, Pid} = emqx_pool:start_link(),
|
||||
emqx_pool:async_submit({?MODULE, test_mfa, []}),
|
||||
exit(Pid, normal).
|
||||
|
Loading…
Reference in New Issue