Merge pull request #6095 from zmstone/sync-v4.3-to-v4.4
Sync v4.3 to v4.4
This commit is contained in:
commit
70bc5f21e4
|
@ -1,4 +1,4 @@
|
||||||
name: Elvis Linter
|
name: Code style check
|
||||||
|
|
||||||
on: [pull_request]
|
on: [pull_request]
|
||||||
|
|
||||||
|
@ -7,10 +7,18 @@ jobs:
|
||||||
runs-on: ubuntu-20.04
|
runs-on: ubuntu-20.04
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
with:
|
||||||
|
fetch-depth: 1000
|
||||||
- name: Set git token
|
- name: Set git token
|
||||||
if: endsWith(github.repository, 'enterprise')
|
if: endsWith(github.repository, 'enterprise')
|
||||||
run: |
|
run: |
|
||||||
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
|
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
|
||||||
git config --global credential.helper store
|
git config --global credential.helper store
|
||||||
- run: |
|
- name: Run elvis check
|
||||||
./scripts/elvis-check.sh $GITHUB_BASE_REF
|
run: |
|
||||||
|
set -e
|
||||||
|
if [ -f EMQX_ENTERPRISE ]; then
|
||||||
|
./scripts/elvis-check.sh $GITHUB_BASE_REF emqx-enterprise
|
||||||
|
else
|
||||||
|
./scripts/elvis-check.sh $GITHUB_BASE_REF emqx
|
||||||
|
fi
|
|
@ -24,6 +24,7 @@ jobs:
|
||||||
git config --global credential.helper store
|
git config --global credential.helper store
|
||||||
echo "${{ secrets.CI_GIT_TOKEN }}" >> scripts/git-token
|
echo "${{ secrets.CI_GIT_TOKEN }}" >> scripts/git-token
|
||||||
make deps-emqx-ee
|
make deps-emqx-ee
|
||||||
|
make clean
|
||||||
fi
|
fi
|
||||||
make docker
|
make docker
|
||||||
echo "::set-output name=version::$(./pkg-vsn.sh)"
|
echo "::set-output name=version::$(./pkg-vsn.sh)"
|
||||||
|
@ -95,7 +96,7 @@ jobs:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
with:
|
with:
|
||||||
repository: emqx/emqx-fvt
|
repository: emqx/emqx-fvt
|
||||||
ref: integration_test_suites
|
ref: v1.4.0
|
||||||
path: scripts
|
path: scripts
|
||||||
- uses: actions/setup-java@v1
|
- uses: actions/setup-java@v1
|
||||||
with:
|
with:
|
||||||
|
@ -188,7 +189,7 @@ jobs:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
with:
|
with:
|
||||||
repository: emqx/emqx-fvt
|
repository: emqx/emqx-fvt
|
||||||
ref: integration_test_suites
|
ref: v1.4.0
|
||||||
path: scripts
|
path: scripts
|
||||||
- uses: actions/setup-java@v1
|
- uses: actions/setup-java@v1
|
||||||
with:
|
with:
|
||||||
|
@ -292,7 +293,7 @@ jobs:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
with:
|
with:
|
||||||
repository: emqx/emqx-fvt
|
repository: emqx/emqx-fvt
|
||||||
ref: integration_test_suites
|
ref: v1.4.0
|
||||||
path: scripts
|
path: scripts
|
||||||
- uses: actions/setup-java@v1
|
- uses: actions/setup-java@v1
|
||||||
with:
|
with:
|
||||||
|
@ -390,7 +391,7 @@ jobs:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
with:
|
with:
|
||||||
repository: emqx/emqx-fvt
|
repository: emqx/emqx-fvt
|
||||||
ref: integration_test_suites
|
ref: v1.4.0
|
||||||
path: scripts
|
path: scripts
|
||||||
- uses: actions/setup-java@v1
|
- uses: actions/setup-java@v1
|
||||||
with:
|
with:
|
||||||
|
|
|
@ -223,7 +223,7 @@ jobs:
|
||||||
- name: Generate matrix
|
- name: Generate matrix
|
||||||
id: generate-matrix
|
id: generate-matrix
|
||||||
run: |
|
run: |
|
||||||
matrix=$(echo -n "$OLD_VSNS" | jq -R -s -c 'split(" ")')
|
matrix=$(echo -n "$OLD_VSNS" | sed 's/ $//g' | jq -R -s -c 'split(" ")')
|
||||||
echo "::set-output name=matrix::$matrix"
|
echo "::set-output name=matrix::$matrix"
|
||||||
|
|
||||||
relup_test_build:
|
relup_test_build:
|
||||||
|
@ -275,6 +275,7 @@ jobs:
|
||||||
runs-on: ubuntu-20.04
|
runs-on: ubuntu-20.04
|
||||||
container: emqx/relup-test-env:erl23.2.7.2-emqx-2-ubuntu20.04
|
container: emqx/relup-test-env:erl23.2.7.2-emqx-2-ubuntu20.04
|
||||||
strategy:
|
strategy:
|
||||||
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
old_vsn: ${{ fromJson(needs.relup_test_plan.outputs.matrix) }}
|
old_vsn: ${{ fromJson(needs.relup_test_plan.outputs.matrix) }}
|
||||||
env:
|
env:
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
{deps,
|
{deps,
|
||||||
[{epgsql, {git, "https://github.com/epgsql/epgsql", {tag, "4.4.0"}}}
|
[{epgsql, {git, "https://github.com/epgsql/epgsql.git", {tag, "4.4.0"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{erl_opts, [warn_unused_vars,
|
{erl_opts, [warn_unused_vars,
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_exproto,
|
{application, emqx_exproto,
|
||||||
[{description, "EMQ X Extension for Protocol"},
|
[{description, "EMQ X Extension for Protocol"},
|
||||||
{vsn, "4.3.2"}, %% strict semver
|
{vsn, "4.3.4"}, %% 4.3.3 is used by ee
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_exproto_app, []}},
|
{mod, {emqx_exproto_app, []}},
|
||||||
|
|
|
@ -1,31 +1,26 @@
|
||||||
%% -*-: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[{"4.3.3",
|
||||||
{"4.3.1", [
|
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exproto_gsvr, brutal_purge, soft_purge, []},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{load_module, emqx_exproto_gcli, brutal_purge, soft_purge, []},
|
{"4.3.2",
|
||||||
{load_module, emqx_exproto_channel, brutal_purge, soft_purge, []}
|
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.0", [
|
{<<"4.3.[0-1]">>,
|
||||||
{load_module, emqx_exproto_gsvr, brutal_purge, soft_purge, []},
|
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exproto_channel, brutal_purge, soft_purge, []}
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
]},
|
{<<".*">>,[]}],
|
||||||
{<<".*">>, []}
|
[{"4.3.3",
|
||||||
],
|
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
[
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.1", [
|
{"4.3.2",
|
||||||
{load_module, emqx_exproto_gsvr, brutal_purge, soft_purge, []},
|
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exproto_gcli, brutal_purge, soft_purge, []},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{load_module, emqx_exproto_channel, brutal_purge, soft_purge, []}
|
{<<"4.3.[0-1]">>,
|
||||||
]},
|
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
|
||||||
{"4.3.0", [
|
|
||||||
{load_module, emqx_exproto_gsvr, brutal_purge, soft_purge, []},
|
|
||||||
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_exproto_channel, brutal_purge, soft_purge, []}
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
]},
|
{<<".*">>,[]}]}.
|
||||||
{<<".*">>, []}
|
|
||||||
]
|
|
||||||
}.
|
|
||||||
|
|
|
@ -94,6 +94,9 @@
|
||||||
awaiting_rel_max
|
awaiting_rel_max
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-define(CHANMOCK(P), {exproto_anonymous_client, P}).
|
||||||
|
-define(CHAN_CONN_TAB, emqx_channel_conn).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Info, Attrs and Caps
|
%% Info, Attrs and Caps
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -155,13 +158,19 @@ init(ConnInfo = #{socktype := Socktype,
|
||||||
conn_state = connecting,
|
conn_state = connecting,
|
||||||
timers = #{}
|
timers = #{}
|
||||||
},
|
},
|
||||||
|
case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of
|
||||||
|
{error, _Reason} ->
|
||||||
|
throw(nopermission);
|
||||||
|
_ ->
|
||||||
|
ConnMod = maps:get(conn_mod, NConnInfo),
|
||||||
|
true = ets:insert(?CHAN_CONN_TAB, {?CHANMOCK(self()), ConnMod}),
|
||||||
Req = #{conninfo =>
|
Req = #{conninfo =>
|
||||||
peercert(Peercert,
|
peercert(Peercert,
|
||||||
#{socktype => socktype(Socktype),
|
#{socktype => socktype(Socktype),
|
||||||
peername => address(Peername),
|
peername => address(Peername),
|
||||||
sockname => address(Sockname)})},
|
sockname => address(Sockname)})},
|
||||||
try_dispatch(on_socket_created, wrap(Req), Channel).
|
try_dispatch(on_socket_created, wrap(Req), Channel)
|
||||||
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
peercert(NoSsl, ConnInfo) when NoSsl == nossl;
|
peercert(NoSsl, ConnInfo) when NoSsl == nossl;
|
||||||
|
@ -283,6 +292,7 @@ handle_call({auth, ClientInfo0, Password},
|
||||||
emqx_metrics:inc('client.auth.anonymous'),
|
emqx_metrics:inc('client.auth.anonymous'),
|
||||||
NClientInfo = maps:merge(ClientInfo1, AuthResult),
|
NClientInfo = maps:merge(ClientInfo1, AuthResult),
|
||||||
NChannel = Channel1#channel{clientinfo = NClientInfo},
|
NChannel = Channel1#channel{clientinfo = NClientInfo},
|
||||||
|
clean_anonymous_clients(),
|
||||||
case emqx_cm:open_session(true, NClientInfo, NConnInfo) of
|
case emqx_cm:open_session(true, NClientInfo, NConnInfo) of
|
||||||
{ok, _Session} ->
|
{ok, _Session} ->
|
||||||
?LOG(debug, "Client ~s (Username: '~s') authorized successfully!",
|
?LOG(debug, "Client ~s (Username: '~s') authorized successfully!",
|
||||||
|
@ -399,12 +409,16 @@ handle_info(Info, Channel) ->
|
||||||
|
|
||||||
-spec(terminate(any(), channel()) -> channel()).
|
-spec(terminate(any(), channel()) -> channel()).
|
||||||
terminate(Reason, Channel) ->
|
terminate(Reason, Channel) ->
|
||||||
|
clean_anonymous_clients(),
|
||||||
Req = #{reason => stringfy(Reason)},
|
Req = #{reason => stringfy(Reason)},
|
||||||
try_dispatch(on_socket_closed, wrap(Req), Channel).
|
try_dispatch(on_socket_closed, wrap(Req), Channel).
|
||||||
|
|
||||||
is_anonymous(#{anonymous := true}) -> true;
|
is_anonymous(#{anonymous := true}) -> true;
|
||||||
is_anonymous(_AuthResult) -> false.
|
is_anonymous(_AuthResult) -> false.
|
||||||
|
|
||||||
|
clean_anonymous_clients() ->
|
||||||
|
ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Sub/UnSub
|
%% Sub/UnSub
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -577,7 +591,6 @@ default_conninfo(ConnInfo) ->
|
||||||
ConnInfo#{clean_start => true,
|
ConnInfo#{clean_start => true,
|
||||||
clientid => undefined,
|
clientid => undefined,
|
||||||
username => undefined,
|
username => undefined,
|
||||||
conn_mod => undefined,
|
|
||||||
conn_props => #{},
|
conn_props => #{},
|
||||||
connected => true,
|
connected => true,
|
||||||
connected_at => erlang:system_time(millisecond),
|
connected_at => erlang:system_time(millisecond),
|
||||||
|
|
|
@ -233,7 +233,11 @@ init(Parent, WrappedSock, Peername0, Options) ->
|
||||||
case esockd_wait(WrappedSock) of
|
case esockd_wait(WrappedSock) of
|
||||||
{ok, NWrappedSock} ->
|
{ok, NWrappedSock} ->
|
||||||
Peername = esockd_peername(NWrappedSock, Peername0),
|
Peername = esockd_peername(NWrappedSock, Peername0),
|
||||||
run_loop(Parent, init_state(NWrappedSock, Peername, Options));
|
try
|
||||||
|
run_loop(Parent, init_state(NWrappedSock, Peername, Options))
|
||||||
|
catch
|
||||||
|
throw : nopermission -> erlang:exit(normal)
|
||||||
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
ok = esockd_close(WrappedSock),
|
ok = esockd_close(WrappedSock),
|
||||||
exit_on_sock_error(Reason)
|
exit_on_sock_error(Reason)
|
||||||
|
@ -601,9 +605,9 @@ handle_outgoing(IoData, State = #state{socket = Socket}) ->
|
||||||
handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
|
handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
|
||||||
case activate_socket(State) of
|
case activate_socket(State) of
|
||||||
{ok, NState = #state{sockstate = NewSst}} ->
|
{ok, NState = #state{sockstate = NewSst}} ->
|
||||||
if OldSst =/= NewSst ->
|
case OldSst =/= NewSst of
|
||||||
{ok, {event, NewSst}, NState};
|
true -> {ok, {event, NewSst}, NState};
|
||||||
true -> {ok, NState}
|
false -> {ok, NState}
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
handle_info({sock_error, Reason}, State)
|
handle_info({sock_error, Reason}, State)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_rule_engine,
|
{application, emqx_rule_engine,
|
||||||
[{description, "EMQ X Rule Engine"},
|
[{description, "EMQ X Rule Engine"},
|
||||||
{vsn, "4.3.5"}, % strict semver, bump manually!
|
{vsn, "4.3.6"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
|
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
|
||||||
{applications, [kernel,stdlib,rulesql,getopt]},
|
{applications, [kernel,stdlib,rulesql,getopt]},
|
||||||
|
|
|
@ -1,64 +1,52 @@
|
||||||
%% -*-: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{"4.3.5",
|
{VSN,
|
||||||
[ {"4.3.0",
|
[{"4.3.5",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
|
|
||||||
, {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
|
||||||
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
|
||||||
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
|
||||||
, {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
|
|
||||||
]},
|
|
||||||
{"4.3.1",
|
|
||||||
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
|
||||||
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
|
||||||
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
|
||||||
, {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
|
|
||||||
]},
|
|
||||||
{"4.3.2",
|
|
||||||
[ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
|
||||||
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
|
||||||
, {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
|
||||||
, {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
|
|
||||||
]},
|
|
||||||
{"4.3.3",
|
|
||||||
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
|
||||||
, {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
|
|
||||||
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
|
||||||
]},
|
|
||||||
{"4.3.4",
|
|
||||||
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
|
||||||
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
|
||||||
]},
|
|
||||||
{<<".*">>, []}
|
|
||||||
],
|
|
||||||
[
|
|
||||||
{"4.3.0",
|
{"4.3.0",
|
||||||
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
|
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
, {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
|
||||||
, {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
||||||
]},
|
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
|
||||||
, {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
||||||
]},
|
|
||||||
{"4.3.2",
|
{"4.3.2",
|
||||||
[ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
|
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
|
||||||
, {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
, {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
||||||
]},
|
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
, {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
]},
|
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
|
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
]},
|
{<<".*">>,[]}],
|
||||||
{<<".*">>, []}
|
[{"4.3.5",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
]
|
{"4.3.0",
|
||||||
}.
|
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
|
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.1",
|
||||||
|
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
|
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.2",
|
||||||
|
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
|
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
|
||||||
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.3",
|
||||||
|
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.4",
|
||||||
|
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
|
{<<".*">>,[]}]}.
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_stomp,
|
{application, emqx_stomp,
|
||||||
[{description, "EMQ X Stomp Protocol Plugin"},
|
[{description, "EMQ X Stomp Protocol Plugin"},
|
||||||
{vsn, "4.3.0"}, % strict semver, bump manually!
|
{vsn, "4.3.2"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_stomp_sup]},
|
{registered, [emqx_stomp_sup]},
|
||||||
{applications, [kernel,stdlib]},
|
{applications, [kernel,stdlib]},
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
%% -*- mode: erlang -*-
|
||||||
|
{VSN,
|
||||||
|
[{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.0",
|
||||||
|
[{restart_application,emqx_stomp}]},
|
||||||
|
{<<".*">>,[]}],
|
||||||
|
[{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.0",
|
||||||
|
[{restart_application,emqx_stomp}]},
|
||||||
|
{<<".*">>,[]}]}.
|
|
@ -20,9 +20,15 @@
|
||||||
|
|
||||||
-include("emqx_stomp.hrl").
|
-include("emqx_stomp.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("emqx/include/types.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-logger_header("[Stomp-Conn]").
|
-logger_header("[Stomp-Conn]").
|
||||||
|
|
||||||
|
-import(emqx_misc,
|
||||||
|
[ start_timer/2
|
||||||
|
]).
|
||||||
|
|
||||||
-export([ start_link/3
|
-export([ start_link/3
|
||||||
, info/1
|
, info/1
|
||||||
]).
|
]).
|
||||||
|
@ -37,56 +43,177 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% for protocol
|
%% for protocol
|
||||||
-export([send/4, heartbeat/2]).
|
-export([send/4, heartbeat/2, statfun/3]).
|
||||||
|
|
||||||
-record(state, {transport, socket, peername, conn_name, conn_state,
|
%% for mgmt
|
||||||
await_recv, rate_limit, parser, pstate,
|
-export([call/2, call/3]).
|
||||||
proto_env, heartbeat}).
|
|
||||||
|
|
||||||
-define(INFO_KEYS, [peername, await_recv, conn_state]).
|
-record(state, {
|
||||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
%% TCP/TLS Transport
|
||||||
|
transport :: esockd:transport(),
|
||||||
|
%% TCP/TLS Socket
|
||||||
|
socket :: esockd:socket(),
|
||||||
|
%% Peername of the connection
|
||||||
|
peername :: emqx_types:peername(),
|
||||||
|
%% Sockname of the connection
|
||||||
|
sockname :: emqx_types:peername(),
|
||||||
|
%% Sock State
|
||||||
|
sockstate :: emqx_types:sockstate(),
|
||||||
|
%% The {active, N} option
|
||||||
|
active_n :: pos_integer(),
|
||||||
|
%% Limiter
|
||||||
|
limiter :: maybe(emqx_limiter:limiter()),
|
||||||
|
%% Limit Timer
|
||||||
|
limit_timer :: maybe(reference()),
|
||||||
|
%% GC State
|
||||||
|
gc_state :: maybe(emqx_gc:gc_state()),
|
||||||
|
%% Stats Timer
|
||||||
|
stats_timer :: disabled | maybe(reference()),
|
||||||
|
%% Parser State
|
||||||
|
parser :: emqx_stomp_frame:parser(),
|
||||||
|
%% Protocol State
|
||||||
|
pstate :: emqx_stomp_protocol:pstate(),
|
||||||
|
%% XXX: some common confs
|
||||||
|
proto_env :: list()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type(state() :: #state{}).
|
||||||
|
|
||||||
|
-define(DEFAULT_GC_POLICY, #{bytes => 16777216, count => 16000}).
|
||||||
|
-define(DEFAULT_OOM_POLICY, #{ max_heap_size => 8388608,
|
||||||
|
message_queue_len => 10000}).
|
||||||
|
|
||||||
|
-define(ACTIVE_N, 100).
|
||||||
|
-define(IDLE_TIMEOUT, 30000).
|
||||||
|
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
|
||||||
|
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
|
||||||
|
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
|
||||||
|
|
||||||
|
-define(ENABLED(X), (X =/= undefined)).
|
||||||
|
|
||||||
|
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_stomp_connection]}}]).
|
||||||
|
|
||||||
|
-dialyzer({nowarn_function, [ ensure_stats_timer/2
|
||||||
|
]}).
|
||||||
|
|
||||||
|
-dialyzer({no_return, [ init/1
|
||||||
|
, init_state/3
|
||||||
|
]}).
|
||||||
|
|
||||||
start_link(Transport, Sock, ProtoEnv) ->
|
start_link(Transport, Sock, ProtoEnv) ->
|
||||||
{ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, ProtoEnv]])}.
|
{ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, ProtoEnv]])}.
|
||||||
|
|
||||||
info(CPid) ->
|
-spec info(pid() | state()) -> emqx_types:infos().
|
||||||
gen_server:call(CPid, info, infinity).
|
info(CPid) when is_pid(CPid) ->
|
||||||
|
call(CPid, info);
|
||||||
|
info(State = #state{pstate = PState}) ->
|
||||||
|
ChanInfo = emqx_stomp_protocol:info(PState),
|
||||||
|
SockInfo = maps:from_list(
|
||||||
|
info(?INFO_KEYS, State)),
|
||||||
|
ChanInfo#{sockinfo => SockInfo}.
|
||||||
|
|
||||||
init([Transport, Sock, ProtoEnv]) ->
|
info(Keys, State) when is_list(Keys) ->
|
||||||
process_flag(trap_exit, true),
|
[{Key, info(Key, State)} || Key <- Keys];
|
||||||
case Transport:wait(Sock) of
|
info(socktype, #state{transport = Transport, socket = Socket}) ->
|
||||||
{ok, NewSock} ->
|
Transport:type(Socket);
|
||||||
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [NewSock]),
|
info(peername, #state{peername = Peername}) ->
|
||||||
ConnName = esockd:format(Peername),
|
Peername;
|
||||||
SendFun = {fun ?MODULE:send/4, [Transport, Sock, self()]},
|
info(sockname, #state{sockname = Sockname}) ->
|
||||||
HrtBtFun = {fun ?MODULE:heartbeat/2, [Transport, Sock]},
|
Sockname;
|
||||||
Parser = emqx_stomp_frame:init_parer_state(ProtoEnv),
|
info(sockstate, #state{sockstate = SockSt}) ->
|
||||||
PState = emqx_stomp_protocol:init(#{peername => Peername,
|
SockSt;
|
||||||
sendfun => SendFun,
|
info(active_n, #state{active_n = ActiveN}) ->
|
||||||
heartfun => HrtBtFun}, ProtoEnv),
|
ActiveN.
|
||||||
RateLimit = init_rate_limit(proplists:get_value(rate_limit, ProtoEnv)),
|
|
||||||
State = run_socket(#state{transport = Transport,
|
-spec stats(pid() | state()) -> emqx_types:stats().
|
||||||
socket = NewSock,
|
stats(CPid) when is_pid(CPid) ->
|
||||||
peername = Peername,
|
call(CPid, stats);
|
||||||
conn_name = ConnName,
|
stats(#state{transport = Transport,
|
||||||
conn_state = running,
|
socket = Socket,
|
||||||
await_recv = false,
|
pstate = PState}) ->
|
||||||
rate_limit = RateLimit,
|
SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of
|
||||||
parser = Parser,
|
{ok, Ss} -> Ss;
|
||||||
proto_env = ProtoEnv,
|
{error, _} -> []
|
||||||
pstate = PState}),
|
end,
|
||||||
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
ConnStats = emqx_pd:get_counters(?CONN_STATS),
|
||||||
gen_server:enter_loop(?MODULE, [{hibernate_after, 5000}], State, 20000);
|
ChanStats = emqx_stomp_protocol:stats(PState),
|
||||||
|
ProcStats = emqx_misc:proc_stats(),
|
||||||
|
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
|
||||||
|
|
||||||
|
call(Pid, Req) ->
|
||||||
|
call(Pid, Req, infinity).
|
||||||
|
call(Pid, Req, Timeout) ->
|
||||||
|
gen_server:call(Pid, Req, Timeout).
|
||||||
|
|
||||||
|
init([Transport, RawSocket, ProtoEnv]) ->
|
||||||
|
case Transport:wait(RawSocket) of
|
||||||
|
{ok, Socket} ->
|
||||||
|
init_state(Transport, Socket, ProtoEnv);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{stop, Reason}
|
ok = Transport:fast_close(RawSocket),
|
||||||
|
exit_on_sock_error(Reason)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
init_rate_limit(undefined) ->
|
init_state(Transport, Socket, ProtoEnv) ->
|
||||||
undefined;
|
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
|
||||||
init_rate_limit({Rate, Burst}) ->
|
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
|
||||||
esockd_rate_limit:new(Rate, Burst).
|
|
||||||
|
|
||||||
send(Data, Transport, Sock, ConnPid) ->
|
SendFun = {fun ?MODULE:send/4, [Transport, Socket, self()]},
|
||||||
|
StatFun = {fun ?MODULE:statfun/3, [Transport, Socket]},
|
||||||
|
HrtBtFun = {fun ?MODULE:heartbeat/2, [Transport, Socket]},
|
||||||
|
Parser = emqx_stomp_frame:init_parer_state(ProtoEnv),
|
||||||
|
|
||||||
|
ActiveN = proplists:get_value(active_n, ProtoEnv, ?ACTIVE_N),
|
||||||
|
GcState = emqx_gc:init(?DEFAULT_GC_POLICY),
|
||||||
|
|
||||||
|
Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
|
||||||
|
ConnInfo = #{socktype => Transport:type(Socket),
|
||||||
|
peername => Peername,
|
||||||
|
sockname => Sockname,
|
||||||
|
peercert => Peercert,
|
||||||
|
statfun => StatFun,
|
||||||
|
sendfun => SendFun,
|
||||||
|
heartfun => HrtBtFun,
|
||||||
|
conn_mod => ?MODULE
|
||||||
|
},
|
||||||
|
PState = emqx_stomp_protocol:init(ConnInfo, ProtoEnv),
|
||||||
|
State = #state{transport = Transport,
|
||||||
|
socket = Socket,
|
||||||
|
peername = Peername,
|
||||||
|
sockname = Sockname,
|
||||||
|
sockstate = idle,
|
||||||
|
active_n = ActiveN,
|
||||||
|
limiter = undefined,
|
||||||
|
parser = Parser,
|
||||||
|
proto_env = ProtoEnv,
|
||||||
|
gc_state = GcState,
|
||||||
|
pstate = PState},
|
||||||
|
case activate_socket(State) of
|
||||||
|
{ok, NState} ->
|
||||||
|
emqx_logger:set_metadata_peername(
|
||||||
|
esockd:format(Peername)),
|
||||||
|
gen_server:enter_loop(
|
||||||
|
?MODULE, [{hibernate_after, 5000}], NState, 20000);
|
||||||
|
{error, Reason} ->
|
||||||
|
ok = Transport:fast_close(Socket),
|
||||||
|
exit_on_sock_error(Reason)
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec exit_on_sock_error(any()) -> no_return().
|
||||||
|
exit_on_sock_error(Reason) when Reason =:= einval;
|
||||||
|
Reason =:= enotconn;
|
||||||
|
Reason =:= closed ->
|
||||||
|
erlang:exit(normal);
|
||||||
|
exit_on_sock_error(timeout) ->
|
||||||
|
erlang:exit({shutdown, ssl_upgrade_timeout});
|
||||||
|
exit_on_sock_error(Reason) ->
|
||||||
|
erlang:exit({shutdown, Reason}).
|
||||||
|
|
||||||
|
send(Frame, Transport, Sock, ConnPid) ->
|
||||||
|
?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]),
|
||||||
|
ok = inc_outgoing_stats(Frame),
|
||||||
|
Data = emqx_stomp_frame:serialize(Frame),
|
||||||
|
?LOG(debug, "SEND ~p", [Data]),
|
||||||
try Transport:async_send(Sock, Data) of
|
try Transport:async_send(Sock, Data) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
{error, Reason} -> ConnPid ! {shutdown, Reason}
|
{error, Reason} -> ConnPid ! {shutdown, Reason}
|
||||||
|
@ -95,23 +222,27 @@ send(Data, Transport, Sock, ConnPid) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
heartbeat(Transport, Sock) ->
|
heartbeat(Transport, Sock) ->
|
||||||
|
?LOG(debug, "SEND heartbeat: \\n"),
|
||||||
Transport:send(Sock, <<$\n>>).
|
Transport:send(Sock, <<$\n>>).
|
||||||
|
|
||||||
handle_call(info, _From, State = #state{transport = Transport,
|
statfun(Stat, Transport, Sock) ->
|
||||||
socket = Sock,
|
case Transport:getstat(Sock, [Stat]) of
|
||||||
peername = Peername,
|
{ok, [{Stat, Val}]} -> {ok, Val};
|
||||||
await_recv = AwaitRecv,
|
{error, Error} -> {error, Error}
|
||||||
conn_state = ConnState,
|
end.
|
||||||
pstate = PState}) ->
|
|
||||||
ClientInfo = [{peername, Peername}, {await_recv, AwaitRecv},
|
handle_call(info, _From, State) ->
|
||||||
{conn_state, ConnState}],
|
{reply, info(State), State};
|
||||||
ProtoInfo = emqx_stomp_protocol:info(PState),
|
|
||||||
case Transport:getstat(Sock, ?SOCK_STATS) of
|
handle_call(stats, _From, State) ->
|
||||||
{ok, SockStats} ->
|
{reply, stats(State), State};
|
||||||
{reply, lists:append([ClientInfo, ProtoInfo, SockStats]), State};
|
|
||||||
{error, Reason} ->
|
handle_call(discard, _From, State) ->
|
||||||
{stop, Reason, lists:append([ClientInfo, ProtoInfo]), State}
|
%% TODO: send the DISCONNECT packet?
|
||||||
end;
|
shutdown_and_reply(discared, ok, State);
|
||||||
|
|
||||||
|
handle_call(kick, _From, State) ->
|
||||||
|
shutdown_and_reply(kicked, ok, State);
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?LOG(error, "unexpected request: ~p", [Req]),
|
?LOG(error, "unexpected request: ~p", [Req]),
|
||||||
|
@ -121,6 +252,13 @@ handle_cast(Msg, State) ->
|
||||||
?LOG(error, "unexpected msg: ~p", [Msg]),
|
?LOG(error, "unexpected msg: ~p", [Msg]),
|
||||||
noreply(State).
|
noreply(State).
|
||||||
|
|
||||||
|
handle_info({event, Name}, State = #state{pstate = PState})
|
||||||
|
when Name == connected;
|
||||||
|
Name == updated ->
|
||||||
|
ClientId = emqx_stomp_protocol:info(clientid, PState),
|
||||||
|
emqx_cm:insert_channel_info(ClientId, info(State), stats(State)),
|
||||||
|
noreply(State);
|
||||||
|
|
||||||
handle_info(timeout, State) ->
|
handle_info(timeout, State) ->
|
||||||
shutdown(idle_timeout, State);
|
shutdown(idle_timeout, State);
|
||||||
|
|
||||||
|
@ -141,26 +279,70 @@ handle_info({timeout, TRef, TMsg}, State) when TMsg =:= incoming;
|
||||||
shutdown({sock_error, Reason}, State)
|
shutdown({sock_error, Reason}, State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
handle_info({timeout, _TRef, limit_timeout}, State) ->
|
||||||
|
NState = State#state{sockstate = idle,
|
||||||
|
limit_timer = undefined
|
||||||
|
},
|
||||||
|
handle_info(activate_socket, NState);
|
||||||
|
|
||||||
|
handle_info({timeout, _TRef, emit_stats},
|
||||||
|
State = #state{pstate = PState}) ->
|
||||||
|
ClientId = emqx_stomp_protocol:info(clientid, PState),
|
||||||
|
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||||
|
noreply(State#state{stats_timer = undefined});
|
||||||
|
|
||||||
handle_info({timeout, TRef, TMsg}, State) ->
|
handle_info({timeout, TRef, TMsg}, State) ->
|
||||||
with_proto(timeout, [TRef, TMsg], State);
|
with_proto(timeout, [TRef, TMsg], State);
|
||||||
|
|
||||||
handle_info({'EXIT', HbProc, Error}, State = #state{heartbeat = HbProc}) ->
|
handle_info(activate_socket, State) ->
|
||||||
stop(Error, State);
|
case activate_socket(State) of
|
||||||
|
{ok, NState} ->
|
||||||
handle_info(activate_sock, State) ->
|
noreply(NState);
|
||||||
noreply(run_socket(State#state{conn_state = running}));
|
{error, Reason} ->
|
||||||
|
handle_info({sock_error, Reason}, State)
|
||||||
handle_info({inet_async, _Sock, _Ref, {ok, Bytes}}, State) ->
|
end;
|
||||||
?LOG(debug, "RECV ~p", [Bytes]),
|
|
||||||
received(Bytes, rate_limit(size(Bytes), State#state{await_recv = false}));
|
|
||||||
|
|
||||||
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
|
||||||
shutdown(Reason, State);
|
|
||||||
|
|
||||||
handle_info({inet_reply, _Ref, ok}, State) ->
|
handle_info({inet_reply, _Ref, ok}, State) ->
|
||||||
noreply(State);
|
noreply(State);
|
||||||
|
|
||||||
|
handle_info({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
|
||||||
|
?LOG(debug, "RECV ~0p", [Data]),
|
||||||
|
Oct = iolist_size(Data),
|
||||||
|
inc_counter(incoming_bytes, Oct),
|
||||||
|
ok = emqx_metrics:inc('bytes.received', Oct),
|
||||||
|
received(Data, ensure_stats_timer(?IDLE_TIMEOUT, State));
|
||||||
|
|
||||||
|
handle_info({Passive, _Sock}, State)
|
||||||
|
when Passive == tcp_passive; Passive == ssl_passive ->
|
||||||
|
%% In Stats
|
||||||
|
Pubs = emqx_pd:reset_counter(incoming_pubs),
|
||||||
|
Bytes = emqx_pd:reset_counter(incoming_bytes),
|
||||||
|
InStats = #{cnt => Pubs, oct => Bytes},
|
||||||
|
%% Ensure Rate Limit
|
||||||
|
NState = ensure_rate_limit(InStats, State),
|
||||||
|
%% Run GC and Check OOM
|
||||||
|
NState1 = check_oom(run_gc(InStats, NState)),
|
||||||
|
handle_info(activate_socket, NState1);
|
||||||
|
|
||||||
|
handle_info({Error, _Sock, Reason}, State)
|
||||||
|
when Error == tcp_error; Error == ssl_error ->
|
||||||
|
handle_info({sock_error, Reason}, State);
|
||||||
|
|
||||||
|
handle_info({Closed, _Sock}, State)
|
||||||
|
when Closed == tcp_closed; Closed == ssl_closed ->
|
||||||
|
handle_info({sock_closed, Closed}, close_socket(State));
|
||||||
|
|
||||||
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||||
|
handle_info({sock_error, Reason}, State);
|
||||||
|
|
||||||
|
handle_info({sock_error, Reason}, State) ->
|
||||||
|
case Reason =/= closed andalso Reason =/= einval of
|
||||||
|
true -> ?LOG(warning, "socket_error: ~p", [Reason]);
|
||||||
|
false -> ok
|
||||||
|
end,
|
||||||
|
handle_info({sock_closed, Reason}, close_socket(State));
|
||||||
|
|
||||||
|
handle_info({sock_closed, Reason}, State) ->
|
||||||
shutdown(Reason, State);
|
shutdown(Reason, State);
|
||||||
|
|
||||||
handle_info({deliver, _Topic, Msg}, State = #state{pstate = PState}) ->
|
handle_info({deliver, _Topic, Msg}, State = #state{pstate = PState}) ->
|
||||||
|
@ -172,8 +354,7 @@ handle_info({deliver, _Topic, Msg}, State = #state{pstate = PState}) ->
|
||||||
end});
|
end});
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
with_proto(handle_info, [Info], State).
|
||||||
noreply(State).
|
|
||||||
|
|
||||||
terminate(Reason, #state{transport = Transport,
|
terminate(Reason, #state{transport = Transport,
|
||||||
socket = Sock,
|
socket = Sock,
|
||||||
|
@ -197,6 +378,8 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
with_proto(Fun, Args, State = #state{pstate = PState}) ->
|
with_proto(Fun, Args, State = #state{pstate = PState}) ->
|
||||||
case erlang:apply(emqx_stomp_protocol, Fun, Args ++ [PState]) of
|
case erlang:apply(emqx_stomp_protocol, Fun, Args ++ [PState]) of
|
||||||
|
ok ->
|
||||||
|
noreply(State);
|
||||||
{ok, NPState} ->
|
{ok, NPState} ->
|
||||||
noreply(State#state{pstate = NPState});
|
noreply(State#state{pstate = NPState});
|
||||||
{F, Reason, NPState} when F == stop;
|
{F, Reason, NPState} when F == stop;
|
||||||
|
@ -215,6 +398,7 @@ received(Bytes, State = #state{parser = Parser,
|
||||||
noreply(State#state{parser = NewParser});
|
noreply(State#state{parser = NewParser});
|
||||||
{ok, Frame, Rest} ->
|
{ok, Frame, Rest} ->
|
||||||
?LOG(info, "RECV Frame: ~s", [emqx_stomp_frame:format(Frame)]),
|
?LOG(info, "RECV Frame: ~s", [emqx_stomp_frame:format(Frame)]),
|
||||||
|
ok = inc_incoming_stats(Frame),
|
||||||
case emqx_stomp_protocol:received(Frame, PState) of
|
case emqx_stomp_protocol:received(Frame, PState) of
|
||||||
{ok, PState1} ->
|
{ok, PState1} ->
|
||||||
received(Rest, reset_parser(State#state{pstate = PState1}));
|
received(Rest, reset_parser(State#state{pstate = PState1}));
|
||||||
|
@ -237,25 +421,97 @@ received(Bytes, State = #state{parser = Parser,
|
||||||
reset_parser(State = #state{proto_env = ProtoEnv}) ->
|
reset_parser(State = #state{proto_env = ProtoEnv}) ->
|
||||||
State#state{parser = emqx_stomp_frame:init_parer_state(ProtoEnv)}.
|
State#state{parser = emqx_stomp_frame:init_parer_state(ProtoEnv)}.
|
||||||
|
|
||||||
rate_limit(_Size, State = #state{rate_limit = undefined}) ->
|
activate_socket(State = #state{sockstate = closed}) ->
|
||||||
run_socket(State);
|
{ok, State};
|
||||||
rate_limit(Size, State = #state{rate_limit = Rl}) ->
|
activate_socket(State = #state{sockstate = blocked}) ->
|
||||||
case esockd_rate_limit:check(Size, Rl) of
|
{ok, State};
|
||||||
{0, Rl1} ->
|
activate_socket(State = #state{transport = Transport,
|
||||||
run_socket(State#state{conn_state = running, rate_limit = Rl1});
|
socket = Socket,
|
||||||
{Pause, Rl1} ->
|
active_n = N}) ->
|
||||||
?LOG(error, "Rate limiter pause for ~p", [Pause]),
|
case Transport:setopts(Socket, [{active, N}]) of
|
||||||
erlang:send_after(Pause, self(), activate_sock),
|
ok -> {ok, State#state{sockstate = running}};
|
||||||
State#state{conn_state = blocked, rate_limit = Rl1}
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
run_socket(State = #state{conn_state = blocked}) ->
|
close_socket(State = #state{sockstate = closed}) -> State;
|
||||||
State;
|
close_socket(State = #state{transport = Transport, socket = Socket}) ->
|
||||||
run_socket(State = #state{await_recv = true}) ->
|
ok = Transport:fast_close(Socket),
|
||||||
State;
|
State#state{sockstate = closed}.
|
||||||
run_socket(State = #state{transport = Transport, socket = Sock}) ->
|
|
||||||
Transport:async_recv(Sock, 0, infinity),
|
%%--------------------------------------------------------------------
|
||||||
State#state{await_recv = true}.
|
%% Inc incoming/outgoing stats
|
||||||
|
|
||||||
|
inc_incoming_stats(#stomp_frame{command = Cmd}) ->
|
||||||
|
inc_counter(recv_pkt, 1),
|
||||||
|
case Cmd of
|
||||||
|
<<"SEND">> ->
|
||||||
|
inc_counter(recv_msg, 1),
|
||||||
|
inc_counter(incoming_pubs, 1),
|
||||||
|
emqx_metrics:inc('messages.received'),
|
||||||
|
emqx_metrics:inc('messages.qos1.received');
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
emqx_metrics:inc('packets.received').
|
||||||
|
|
||||||
|
inc_outgoing_stats(#stomp_frame{command = Cmd}) ->
|
||||||
|
inc_counter(send_pkt, 1),
|
||||||
|
case Cmd of
|
||||||
|
<<"MESSAGE">> ->
|
||||||
|
inc_counter(send_msg, 1),
|
||||||
|
inc_counter(outgoing_pubs, 1),
|
||||||
|
emqx_metrics:inc('messages.sent'),
|
||||||
|
emqx_metrics:inc('messages.qos1.sent');
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
emqx_metrics:inc('packets.sent').
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Ensure rate limit
|
||||||
|
|
||||||
|
ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
||||||
|
case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of
|
||||||
|
false -> State;
|
||||||
|
{ok, Limiter1} ->
|
||||||
|
State#state{limiter = Limiter1};
|
||||||
|
{pause, Time, Limiter1} ->
|
||||||
|
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
|
||||||
|
TRef = start_timer(Time, limit_timeout),
|
||||||
|
State#state{sockstate = blocked,
|
||||||
|
limiter = Limiter1,
|
||||||
|
limit_timer = TRef
|
||||||
|
}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Run GC and Check OOM
|
||||||
|
|
||||||
|
run_gc(Stats, State = #state{gc_state = GcSt}) ->
|
||||||
|
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
|
||||||
|
false -> State;
|
||||||
|
{_IsGC, GcSt1} ->
|
||||||
|
State#state{gc_state = GcSt1}
|
||||||
|
end.
|
||||||
|
|
||||||
|
check_oom(State) ->
|
||||||
|
OomPolicy = ?DEFAULT_OOM_POLICY,
|
||||||
|
?tp(debug, check_oom, #{policy => OomPolicy}),
|
||||||
|
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
|
||||||
|
{shutdown, Reason} ->
|
||||||
|
%% triggers terminate/2 callback immediately
|
||||||
|
erlang:exit({shutdown, Reason});
|
||||||
|
_Other ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
State.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Ensure/cancel stats timer
|
||||||
|
|
||||||
|
ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
|
||||||
|
State#state{stats_timer = start_timer(Timeout, emit_stats)};
|
||||||
|
ensure_stats_timer(_Timeout, State) -> State.
|
||||||
|
|
||||||
getstat(Stat, #state{transport = Transport, socket = Sock}) ->
|
getstat(Stat, #state{transport = Transport, socket = Sock}) ->
|
||||||
case Transport:getstat(Sock, [Stat]) of
|
case Transport:getstat(Sock, [Stat]) of
|
||||||
|
@ -272,3 +528,9 @@ stop(Reason, State) ->
|
||||||
shutdown(Reason, State) ->
|
shutdown(Reason, State) ->
|
||||||
stop({shutdown, Reason}, State).
|
stop({shutdown, Reason}, State).
|
||||||
|
|
||||||
|
shutdown_and_reply(Reason, Reply, State) ->
|
||||||
|
{stop, {shutdown, Reason}, Reply, State}.
|
||||||
|
|
||||||
|
inc_counter(Key, Inc) ->
|
||||||
|
_ = emqx_pd:inc_counter(Key, Inc),
|
||||||
|
ok.
|
||||||
|
|
|
@ -126,6 +126,13 @@ parse(Bytes, #{phase := body, len := Len, state := State}) ->
|
||||||
|
|
||||||
parse(Bytes, Parser = #{pre := Pre}) ->
|
parse(Bytes, Parser = #{pre := Pre}) ->
|
||||||
parse(<<Pre/binary, Bytes/binary>>, maps:without([pre], Parser));
|
parse(<<Pre/binary, Bytes/binary>>, maps:without([pre], Parser));
|
||||||
|
parse(<<?CR, Rest/binary>>, Parser = #{phase := none}) ->
|
||||||
|
parse(Rest, Parser);
|
||||||
|
parse(<<?LF, Rest/binary>>, Parser = #{phase := none}) ->
|
||||||
|
case byte_size(Rest) of
|
||||||
|
0 -> {more, Parser};
|
||||||
|
_ -> parse(Rest, Parser)
|
||||||
|
end;
|
||||||
parse(<<?CR, ?LF, Rest/binary>>, #{phase := Phase, state := State}) ->
|
parse(<<?CR, ?LF, Rest/binary>>, #{phase := Phase, state := State}) ->
|
||||||
parse(Phase, <<?LF, Rest/binary>>, State);
|
parse(Phase, <<?LF, Rest/binary>>, State);
|
||||||
parse(<<?CR>>, Parser) ->
|
parse(<<?CR>>, Parser) ->
|
||||||
|
|
|
@ -23,9 +23,10 @@
|
||||||
, check/3
|
, check/3
|
||||||
, info/1
|
, info/1
|
||||||
, interval/2
|
, interval/2
|
||||||
|
, reset/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-record(heartbeater, {interval, statval, repeat}).
|
-record(heartbeater, {interval, statval, repeat, repeat_max}).
|
||||||
|
|
||||||
-type name() :: incoming | outgoing.
|
-type name() :: incoming | outgoing.
|
||||||
|
|
||||||
|
@ -33,7 +34,6 @@
|
||||||
outgoing => #heartbeater{}
|
outgoing => #heartbeater{}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -43,19 +43,23 @@ init({0, 0}) ->
|
||||||
#{};
|
#{};
|
||||||
init({Cx, Cy}) ->
|
init({Cx, Cy}) ->
|
||||||
maps:filter(fun(_, V) -> V /= undefined end,
|
maps:filter(fun(_, V) -> V /= undefined end,
|
||||||
#{incoming => heartbeater(Cx),
|
#{incoming => heartbeater(incoming, Cx),
|
||||||
outgoing => heartbeater(Cy)
|
outgoing => heartbeater(outgoing, Cy)
|
||||||
}).
|
}).
|
||||||
|
|
||||||
heartbeater(0) ->
|
heartbeater(_, 0) ->
|
||||||
undefined;
|
undefined;
|
||||||
heartbeater(I) ->
|
heartbeater(N, I) ->
|
||||||
#heartbeater{
|
#heartbeater{
|
||||||
interval = I,
|
interval = I,
|
||||||
statval = 0,
|
statval = 0,
|
||||||
repeat = 0
|
repeat = 0,
|
||||||
|
repeat_max = repeat_max(N)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
repeat_max(incoming) -> 1;
|
||||||
|
repeat_max(outgoing) -> 0.
|
||||||
|
|
||||||
-spec check(name(), pos_integer(), heartbeat())
|
-spec check(name(), pos_integer(), heartbeat())
|
||||||
-> {ok, heartbeat()}
|
-> {ok, heartbeat()}
|
||||||
| {error, timeout}.
|
| {error, timeout}.
|
||||||
|
@ -68,11 +72,12 @@ check(Name, NewVal, HrtBt) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check(NewVal, HrtBter = #heartbeater{statval = OldVal,
|
check(NewVal, HrtBter = #heartbeater{statval = OldVal,
|
||||||
repeat = Repeat}) ->
|
repeat = Repeat,
|
||||||
|
repeat_max = Max}) ->
|
||||||
if
|
if
|
||||||
NewVal =/= OldVal ->
|
NewVal =/= OldVal ->
|
||||||
{ok, HrtBter#heartbeater{statval = NewVal, repeat = 0}};
|
{ok, HrtBter#heartbeater{statval = NewVal, repeat = 0}};
|
||||||
Repeat < 1 ->
|
Repeat < Max ->
|
||||||
{ok, HrtBter#heartbeater{repeat = Repeat + 1}};
|
{ok, HrtBter#heartbeater{repeat = Repeat + 1}};
|
||||||
true -> {error, timeout}
|
true -> {error, timeout}
|
||||||
end.
|
end.
|
||||||
|
@ -90,3 +95,10 @@ interval(Type, HrtBt) ->
|
||||||
undefined -> undefined;
|
undefined -> undefined;
|
||||||
#heartbeater{interval = Intv} -> Intv
|
#heartbeater{interval = Intv} -> Intv
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
reset(Type, StatVal, HrtBt) ->
|
||||||
|
case maps:get(Type, HrtBt, undefined) of
|
||||||
|
undefined -> HrtBt;
|
||||||
|
HrtBter ->
|
||||||
|
HrtBt#{Type => HrtBter#heartbeater{statval = StatVal, repeat = 0}}
|
||||||
|
end.
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
-include("emqx_stomp.hrl").
|
-include("emqx_stomp.hrl").
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
-include_lib("emqx/include/types.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
|
||||||
|
@ -30,6 +31,8 @@
|
||||||
%% API
|
%% API
|
||||||
-export([ init/2
|
-export([ init/2
|
||||||
, info/1
|
, info/1
|
||||||
|
, info/2
|
||||||
|
, stats/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ received/2
|
-export([ received/2
|
||||||
|
@ -38,6 +41,9 @@
|
||||||
, timeout/3
|
, timeout/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([ handle_info/2
|
||||||
|
]).
|
||||||
|
|
||||||
%% for trans callback
|
%% for trans callback
|
||||||
-export([ handle_recv_send_frame/2
|
-export([ handle_recv_send_frame/2
|
||||||
, handle_recv_ack_frame/2
|
, handle_recv_ack_frame/2
|
||||||
|
@ -45,21 +51,37 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-record(pstate, {
|
-record(pstate, {
|
||||||
peername,
|
%% Stomp ConnInfo
|
||||||
heartfun,
|
conninfo :: emqx_types:conninfo(),
|
||||||
sendfun,
|
%% Stomp ClientInfo
|
||||||
|
clientinfo :: emqx_types:clientinfo(),
|
||||||
|
%% Stomp Heartbeats
|
||||||
|
heart_beats :: maybe(emqx_stomp_hearbeat:heartbeat()),
|
||||||
|
%% Stomp Connection State
|
||||||
connected = false,
|
connected = false,
|
||||||
proto_ver,
|
%% Timers
|
||||||
proto_name,
|
|
||||||
heart_beats,
|
|
||||||
login,
|
|
||||||
allow_anonymous,
|
|
||||||
default_user,
|
|
||||||
subscriptions = [],
|
|
||||||
timers :: #{atom() => disable | undefined | reference()},
|
timers :: #{atom() => disable | undefined | reference()},
|
||||||
transaction :: #{binary() => list()}
|
%% Transaction
|
||||||
|
transaction :: #{binary() => list()},
|
||||||
|
%% Subscriptions
|
||||||
|
subscriptions = #{},
|
||||||
|
%% Send function
|
||||||
|
sendfun :: {function(), list()},
|
||||||
|
%% Heartbeat function
|
||||||
|
heartfun :: {function(), list()},
|
||||||
|
%% Get Socket stat function
|
||||||
|
statfun :: {function(), list()},
|
||||||
|
%% The confs for the connection
|
||||||
|
%% TODO: put these configs into a public mem?
|
||||||
|
allow_anonymous :: maybe(boolean()),
|
||||||
|
default_user :: maybe(list())
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-define(DEFAULT_SUB_ACK, <<"auto">>).
|
||||||
|
|
||||||
|
-define(INCOMING_TIMER_BACKOFF, 1.25).
|
||||||
|
-define(OUTCOMING_TIMER_BACKOFF, 0.75).
|
||||||
|
|
||||||
-define(TIMER_TABLE, #{
|
-define(TIMER_TABLE, #{
|
||||||
incoming_timer => incoming,
|
incoming_timer => incoming,
|
||||||
outgoing_timer => outgoing,
|
outgoing_timer => outgoing,
|
||||||
|
@ -68,34 +90,135 @@
|
||||||
|
|
||||||
-define(TRANS_TIMEOUT, 60000).
|
-define(TRANS_TIMEOUT, 60000).
|
||||||
|
|
||||||
|
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
|
||||||
|
|
||||||
|
-define(STATS_KEYS, [subscriptions_cnt,
|
||||||
|
subscriptions_max,
|
||||||
|
inflight_cnt,
|
||||||
|
inflight_max,
|
||||||
|
mqueue_len,
|
||||||
|
mqueue_max,
|
||||||
|
mqueue_dropped,
|
||||||
|
next_pkt_id,
|
||||||
|
awaiting_rel_cnt,
|
||||||
|
awaiting_rel_max
|
||||||
|
]).
|
||||||
|
|
||||||
|
-dialyzer({nowarn_function, [ check_acl/3
|
||||||
|
, init/2
|
||||||
|
]}).
|
||||||
|
|
||||||
-type(pstate() :: #pstate{}).
|
-type(pstate() :: #pstate{}).
|
||||||
|
|
||||||
%% @doc Init protocol
|
%% @doc Init protocol
|
||||||
init(#{peername := Peername,
|
init(ConnInfo = #{peername := {PeerHost, _Port},
|
||||||
|
sockname := {_Host, SockPort},
|
||||||
|
statfun := StatFun,
|
||||||
sendfun := SendFun,
|
sendfun := SendFun,
|
||||||
heartfun := HeartFun}, Env) ->
|
heartfun := HeartFun}, Opts) ->
|
||||||
AllowAnonymous = get_value(allow_anonymous, Env, false),
|
|
||||||
DefaultUser = get_value(default_user, Env),
|
NConnInfo = default_conninfo(ConnInfo),
|
||||||
#pstate{peername = Peername,
|
|
||||||
|
ClientInfo = #{zone => undefined,
|
||||||
|
protocol => stomp,
|
||||||
|
peerhost => PeerHost,
|
||||||
|
sockport => SockPort,
|
||||||
|
clientid => undefined,
|
||||||
|
username => undefined,
|
||||||
|
mountpoint => undefined, %% XXX: not supported now
|
||||||
|
is_bridge => false,
|
||||||
|
is_superuser => false
|
||||||
|
},
|
||||||
|
|
||||||
|
AllowAnonymous = get_value(allow_anonymous, Opts, false),
|
||||||
|
DefaultUser = get_value(default_user, Opts),
|
||||||
|
|
||||||
|
#pstate{
|
||||||
|
conninfo = NConnInfo,
|
||||||
|
clientinfo = ClientInfo,
|
||||||
heartfun = HeartFun,
|
heartfun = HeartFun,
|
||||||
sendfun = SendFun,
|
sendfun = SendFun,
|
||||||
|
statfun = StatFun,
|
||||||
timers = #{},
|
timers = #{},
|
||||||
transaction = #{},
|
transaction = #{},
|
||||||
allow_anonymous = AllowAnonymous,
|
allow_anonymous = AllowAnonymous,
|
||||||
default_user = DefaultUser}.
|
default_user = DefaultUser
|
||||||
|
}.
|
||||||
|
|
||||||
info(#pstate{connected = Connected,
|
default_conninfo(ConnInfo) ->
|
||||||
proto_ver = ProtoVer,
|
NConnInfo = maps:without([sendfun, heartfun], ConnInfo),
|
||||||
proto_name = ProtoName,
|
NConnInfo#{
|
||||||
heart_beats = Heartbeats,
|
proto_name => <<"STOMP">>,
|
||||||
login = Login,
|
proto_ver => <<"1.2">>,
|
||||||
subscriptions = Subscriptions}) ->
|
clean_start => true,
|
||||||
[{connected, Connected},
|
clientid => undefined,
|
||||||
{proto_ver, ProtoVer},
|
username => undefined,
|
||||||
{proto_name, ProtoName},
|
conn_props => [],
|
||||||
{heart_beats, Heartbeats},
|
connected => false,
|
||||||
{login, Login},
|
connected_at => undefined,
|
||||||
{subscriptions, Subscriptions}].
|
keepalive => undefined,
|
||||||
|
receive_maximum => 0,
|
||||||
|
expiry_interval => 0
|
||||||
|
}.
|
||||||
|
|
||||||
|
-spec info(pstate()) -> emqx_types:infos().
|
||||||
|
info(State) ->
|
||||||
|
maps:from_list(info(?INFO_KEYS, State)).
|
||||||
|
|
||||||
|
-spec info(list(atom())|atom(), pstate()) -> term().
|
||||||
|
info(Keys, State) when is_list(Keys) ->
|
||||||
|
[{Key, info(Key, State)} || Key <- Keys];
|
||||||
|
info(conninfo, #pstate{conninfo = ConnInfo}) ->
|
||||||
|
ConnInfo;
|
||||||
|
info(socktype, #pstate{conninfo = ConnInfo}) ->
|
||||||
|
maps:get(socktype, ConnInfo, undefined);
|
||||||
|
info(peername, #pstate{conninfo = ConnInfo}) ->
|
||||||
|
maps:get(peername, ConnInfo, undefined);
|
||||||
|
info(sockname, #pstate{conninfo = ConnInfo}) ->
|
||||||
|
maps:get(sockname, ConnInfo, undefined);
|
||||||
|
info(proto_name, #pstate{conninfo = ConnInfo}) ->
|
||||||
|
maps:get(proto_name, ConnInfo, undefined);
|
||||||
|
info(proto_ver, #pstate{conninfo = ConnInfo}) ->
|
||||||
|
maps:get(proto_ver, ConnInfo, undefined);
|
||||||
|
info(connected_at, #pstate{conninfo = ConnInfo}) ->
|
||||||
|
maps:get(connected_at, ConnInfo, undefined);
|
||||||
|
info(clientinfo, #pstate{clientinfo = ClientInfo}) ->
|
||||||
|
ClientInfo;
|
||||||
|
info(zone, _) ->
|
||||||
|
undefined;
|
||||||
|
info(clientid, #pstate{clientinfo = ClientInfo}) ->
|
||||||
|
maps:get(clientid, ClientInfo, undefined);
|
||||||
|
info(username, #pstate{clientinfo = ClientInfo}) ->
|
||||||
|
maps:get(username, ClientInfo, undefined);
|
||||||
|
info(session, State) ->
|
||||||
|
session_info(State);
|
||||||
|
info(conn_state, #pstate{connected = true}) ->
|
||||||
|
connected;
|
||||||
|
info(conn_state, _) ->
|
||||||
|
disconnected;
|
||||||
|
info(will_msg, _) ->
|
||||||
|
undefined.
|
||||||
|
|
||||||
|
session_info(#pstate{conninfo = ConnInfo, subscriptions = Subs}) ->
|
||||||
|
#{subscriptions => Subs,
|
||||||
|
upgrade_qos => false,
|
||||||
|
retry_interval => 0,
|
||||||
|
await_rel_timeout => 0,
|
||||||
|
created_at => maps:get(connected_at, ConnInfo, 0)
|
||||||
|
}.
|
||||||
|
|
||||||
|
-spec stats(pstate()) -> emqx_types:stats().
|
||||||
|
stats(#pstate{subscriptions = Subs}) ->
|
||||||
|
[{subscriptions_cnt, maps:size(Subs)},
|
||||||
|
{subscriptions_max, 0},
|
||||||
|
{inflight_cnt, 0},
|
||||||
|
{inflight_max, 0},
|
||||||
|
{mqueue_len, 0},
|
||||||
|
{mqueue_max, 0},
|
||||||
|
{mqueue_dropped, 0},
|
||||||
|
{next_pkt_id, 0},
|
||||||
|
{awaiting_rel_cnt, 0},
|
||||||
|
{awaiting_rel_max, 0}].
|
||||||
|
|
||||||
-spec(received(stomp_frame(), pstate())
|
-spec(received(stomp_frame(), pstate())
|
||||||
-> {ok, pstate()}
|
-> {ok, pstate()}
|
||||||
|
@ -105,20 +228,49 @@ received(Frame = #stomp_frame{command = <<"STOMP">>}, State) ->
|
||||||
received(Frame#stomp_frame{command = <<"CONNECT">>}, State);
|
received(Frame#stomp_frame{command = <<"CONNECT">>}, State);
|
||||||
|
|
||||||
received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
|
received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
|
||||||
State = #pstate{connected = false, allow_anonymous = AllowAnonymous, default_user = DefaultUser}) ->
|
State = #pstate{connected = false}) ->
|
||||||
case negotiate_version(header(<<"accept-version">>, Headers)) of
|
case negotiate_version(header(<<"accept-version">>, Headers)) of
|
||||||
{ok, Version} ->
|
{ok, Version} ->
|
||||||
Login = header(<<"login">>, Headers),
|
Login = header(<<"login">>, Headers),
|
||||||
Passc = header(<<"passcode">>, Headers),
|
Passc = header(<<"passcode">>, Headers),
|
||||||
case check_login(Login, Passc, AllowAnonymous, DefaultUser) of
|
case check_login(Login, Passc,
|
||||||
|
allow_anonymous(State),
|
||||||
|
default_user(State)
|
||||||
|
) of
|
||||||
true ->
|
true ->
|
||||||
emqx_logger:set_metadata_clientid(Login),
|
Heartbeats = parse_heartbeats(
|
||||||
|
header(<<"heart-beat">>, Headers, <<"0,0">>)),
|
||||||
|
ClientId = emqx_guid:to_base62(emqx_guid:gen()),
|
||||||
|
emqx_logger:set_metadata_clientid(ClientId),
|
||||||
|
ConnInfo = State#pstate.conninfo,
|
||||||
|
ClitInfo = State#pstate.clientinfo,
|
||||||
|
NConnInfo = ConnInfo#{
|
||||||
|
proto_ver => Version,
|
||||||
|
clientid => ClientId,
|
||||||
|
keepalive => element(1, Heartbeats) div 1000,
|
||||||
|
username => Login
|
||||||
|
},
|
||||||
|
NClitInfo = ClitInfo#{
|
||||||
|
clientid => ClientId,
|
||||||
|
username => Login
|
||||||
|
},
|
||||||
|
|
||||||
Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)),
|
ConnPid = self(),
|
||||||
NState = start_heartbeart_timer(Heartbeats, State#pstate{connected = true,
|
_ = emqx_cm_locker:trans(ClientId, fun(_) ->
|
||||||
proto_ver = Version, login = Login}),
|
emqx_cm:discard_session(ClientId),
|
||||||
send(connected_frame([{<<"version">>, Version},
|
emqx_cm:register_channel(ClientId, ConnPid, NConnInfo)
|
||||||
{<<"heart-beat">>, reverse_heartbeats(Heartbeats)}]), NState);
|
end),
|
||||||
|
NState = start_heartbeart_timer(
|
||||||
|
Heartbeats,
|
||||||
|
State#pstate{
|
||||||
|
conninfo = NConnInfo,
|
||||||
|
clientinfo = NClitInfo}
|
||||||
|
),
|
||||||
|
ConnectedFrame = connected_frame(
|
||||||
|
[{<<"version">>, Version},
|
||||||
|
{<<"heart-beat">>, reverse_heartbeats(Heartbeats)}
|
||||||
|
]),
|
||||||
|
send(ConnectedFrame, ensure_connected(NState));
|
||||||
false ->
|
false ->
|
||||||
_ = send(error_frame(undefined, <<"Login or passcode error!">>), State),
|
_ = send(error_frame(undefined, <<"Login or passcode error!">>), State),
|
||||||
{error, login_or_passcode_error, State}
|
{error, login_or_passcode_error, State}
|
||||||
|
@ -130,6 +282,7 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
|
||||||
end;
|
end;
|
||||||
|
|
||||||
received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true}) ->
|
received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true}) ->
|
||||||
|
?LOG(error, "Received CONNECT frame on connected=true state"),
|
||||||
{error, unexpected_connect, State};
|
{error, unexpected_connect, State};
|
||||||
|
|
||||||
received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) ->
|
received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) ->
|
||||||
|
@ -139,31 +292,51 @@ received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) -
|
||||||
end;
|
end;
|
||||||
|
|
||||||
received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
|
received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
|
||||||
State = #pstate{subscriptions = Subscriptions}) ->
|
State = #pstate{subscriptions = Subs}) ->
|
||||||
Id = header(<<"id">>, Headers),
|
Id = header(<<"id">>, Headers),
|
||||||
Topic = header(<<"destination">>, Headers),
|
Topic = header(<<"destination">>, Headers),
|
||||||
Ack = header(<<"ack">>, Headers, <<"auto">>),
|
Ack = header(<<"ack">>, Headers, ?DEFAULT_SUB_ACK),
|
||||||
{ok, State1} = case lists:keyfind(Id, 1, Subscriptions) of
|
|
||||||
{Id, Topic, Ack} ->
|
case find_sub_by_id(Id, Subs) of
|
||||||
{ok, State};
|
{Topic, #{sub_props := #{id := Id}}} ->
|
||||||
false ->
|
?LOG(info, "Subscription has established: ~s", [Topic]),
|
||||||
emqx_broker:subscribe(Topic),
|
maybe_send_receipt(receipt_id(Headers), State);
|
||||||
{ok, State#pstate{subscriptions = [{Id, Topic, Ack}|Subscriptions]}}
|
{InuseTopic, #{sub_props := #{id := InuseId}}} ->
|
||||||
end,
|
?LOG(info, "Subscription id ~p inused by topic: ~s, "
|
||||||
maybe_send_receipt(receipt_id(Headers), State1);
|
"request topic: ~s", [InuseId, InuseTopic, Topic]),
|
||||||
|
send(error_frame(receipt_id(Headers),
|
||||||
|
["Request sub-id ", Id, " inused "]), State);
|
||||||
|
undefined ->
|
||||||
|
case check_acl(subscribe, Topic, State) of
|
||||||
|
allow ->
|
||||||
|
ClientInfo = State#pstate.clientinfo,
|
||||||
|
|
||||||
|
[{TopicFilter, SubOpts}] = parse_topic_filters(
|
||||||
|
[{Topic, ?DEFAULT_SUBOPTS}
|
||||||
|
]),
|
||||||
|
NSubOpts = SubOpts#{sub_props => #{id => Id, ack => Ack}},
|
||||||
|
_ = run_hooks('client.subscribe',
|
||||||
|
[ClientInfo, _SubProps = #{}],
|
||||||
|
[{TopicFilter, NSubOpts}]),
|
||||||
|
NState = do_subscribe(TopicFilter, NSubOpts, State),
|
||||||
|
maybe_send_receipt(receipt_id(Headers), NState)
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
|
||||||
received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers},
|
received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers},
|
||||||
State = #pstate{subscriptions = Subscriptions}) ->
|
State = #pstate{subscriptions = Subs, clientinfo = ClientInfo}) ->
|
||||||
Id = header(<<"id">>, Headers),
|
Id = header(<<"id">>, Headers),
|
||||||
|
{ok, NState} = case find_sub_by_id(Id, Subs) of
|
||||||
{ok, State1} = case lists:keyfind(Id, 1, Subscriptions) of
|
{Topic, #{sub_props := #{id := Id}}} ->
|
||||||
{Id, Topic, _Ack} ->
|
_ = run_hooks('client.unsubscribe',
|
||||||
ok = emqx_broker:unsubscribe(Topic),
|
[ClientInfo, #{}],
|
||||||
{ok, State#pstate{subscriptions = lists:keydelete(Id, 1, Subscriptions)}};
|
[{Topic, #{}}]),
|
||||||
false ->
|
State1 = do_unsubscribe(Topic, ?DEFAULT_SUBOPTS, State),
|
||||||
|
{ok, State1};
|
||||||
|
undefined ->
|
||||||
{ok, State}
|
{ok, State}
|
||||||
end,
|
end,
|
||||||
maybe_send_receipt(receipt_id(Headers), State1);
|
maybe_send_receipt(receipt_id(Headers), NState);
|
||||||
|
|
||||||
%% ACK
|
%% ACK
|
||||||
%% id:12345
|
%% id:12345
|
||||||
|
@ -239,10 +412,15 @@ received(#stomp_frame{command = <<"DISCONNECT">>, headers = Headers}, State) ->
|
||||||
_ = maybe_send_receipt(receipt_id(Headers), State),
|
_ = maybe_send_receipt(receipt_id(Headers), State),
|
||||||
{stop, normal, State}.
|
{stop, normal, State}.
|
||||||
|
|
||||||
send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},
|
send(Msg0 = #message{},
|
||||||
State = #pstate{subscriptions = Subscriptions}) ->
|
State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) ->
|
||||||
case lists:keyfind(Topic, 2, Subscriptions) of
|
ok = emqx_metrics:inc('messages.delivered'),
|
||||||
{Id, Topic, Ack} ->
|
Msg = emqx_hooks:run_fold('message.delivered', [ClientInfo], Msg0),
|
||||||
|
#message{topic = Topic,
|
||||||
|
headers = Headers,
|
||||||
|
payload = Payload} = Msg,
|
||||||
|
case find_sub_by_topic(Topic, Subs) of
|
||||||
|
{Topic, #{sub_props := #{id := Id, ack := Ack}}} ->
|
||||||
Headers0 = [{<<"subscription">>, Id},
|
Headers0 = [{<<"subscription">>, Id},
|
||||||
{<<"message-id">>, next_msgid()},
|
{<<"message-id">>, next_msgid()},
|
||||||
{<<"destination">>, Topic},
|
{<<"destination">>, Topic},
|
||||||
|
@ -256,19 +434,21 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},
|
||||||
Frame = #stomp_frame{command = <<"MESSAGE">>,
|
Frame = #stomp_frame{command = <<"MESSAGE">>,
|
||||||
headers = Headers1 ++ maps:get(stomp_headers, Headers, []),
|
headers = Headers1 ++ maps:get(stomp_headers, Headers, []),
|
||||||
body = Payload},
|
body = Payload},
|
||||||
|
|
||||||
|
|
||||||
send(Frame, State);
|
send(Frame, State);
|
||||||
false ->
|
undefined ->
|
||||||
?LOG(error, "Stomp dropped: ~p", [Msg]),
|
?LOG(error, "Stomp dropped: ~p", [Msg]),
|
||||||
{error, dropped, State}
|
{error, dropped, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
send(Frame, State = #pstate{sendfun = {Fun, Args}}) ->
|
send(Frame, State = #pstate{sendfun = {Fun, Args}}) when is_record(Frame, stomp_frame) ->
|
||||||
?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]),
|
erlang:apply(Fun, [Frame] ++ Args),
|
||||||
Data = emqx_stomp_frame:serialize(Frame),
|
|
||||||
?LOG(debug, "SEND ~p", [Data]),
|
|
||||||
erlang:apply(Fun, [Data] ++ Args),
|
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
|
shutdown(Reason, State = #pstate{connected = true}) ->
|
||||||
|
_ = ensure_disconnected(Reason, State),
|
||||||
|
ok;
|
||||||
shutdown(_Reason, _State) ->
|
shutdown(_Reason, _State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -283,11 +463,18 @@ timeout(_TRef, {incoming, NewVal},
|
||||||
|
|
||||||
timeout(_TRef, {outgoing, NewVal},
|
timeout(_TRef, {outgoing, NewVal},
|
||||||
State = #pstate{heart_beats = HrtBt,
|
State = #pstate{heart_beats = HrtBt,
|
||||||
|
statfun = {StatFun, StatArgs},
|
||||||
heartfun = {Fun, Args}}) ->
|
heartfun = {Fun, Args}}) ->
|
||||||
case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of
|
case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
_ = erlang:apply(Fun, Args),
|
_ = erlang:apply(Fun, Args),
|
||||||
{ok, State};
|
case erlang:apply(StatFun, [send_oct] ++ StatArgs) of
|
||||||
|
{ok, NewVal2} ->
|
||||||
|
NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal2, HrtBt),
|
||||||
|
{ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})};
|
||||||
|
{error, Reason} ->
|
||||||
|
{shutdown, {error, {get_stats_error, Reason}}, State}
|
||||||
|
end;
|
||||||
{ok, NHrtBt} ->
|
{ok, NHrtBt} ->
|
||||||
{ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})}
|
{ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})}
|
||||||
end;
|
end;
|
||||||
|
@ -297,6 +484,28 @@ timeout(_TRef, clean_trans, State = #pstate{transaction = Trans}) ->
|
||||||
NTrans = maps:filter(fun(_, {Ts, _}) -> Ts + ?TRANS_TIMEOUT < Now end, Trans),
|
NTrans = maps:filter(fun(_, {Ts, _}) -> Ts + ?TRANS_TIMEOUT < Now end, Trans),
|
||||||
{ok, ensure_clean_trans_timer(State#pstate{transaction = NTrans})}.
|
{ok, ensure_clean_trans_timer(State#pstate{transaction = NTrans})}.
|
||||||
|
|
||||||
|
|
||||||
|
-spec(handle_info(Info :: term(), pstate())
|
||||||
|
-> ok | {ok, pstate()} | {shutdown, Reason :: term(), pstate()}).
|
||||||
|
|
||||||
|
handle_info({subscribe, TopicFilters}, State) ->
|
||||||
|
NState = lists:foldl(
|
||||||
|
fun({TopicFilter, SubOpts}, StateAcc = #pstate{subscriptions = Subs}) ->
|
||||||
|
NSubOpts = enrich_sub_opts(SubOpts, Subs),
|
||||||
|
do_subscribe(TopicFilter, NSubOpts, StateAcc)
|
||||||
|
end, State, parse_topic_filters(TopicFilters)),
|
||||||
|
{ok, NState};
|
||||||
|
|
||||||
|
handle_info({unsubscribe, TopicFilters}, State) ->
|
||||||
|
NState = lists:foldl(fun({TopicFilter, SubOpts}, StateAcc) ->
|
||||||
|
do_unsubscribe(TopicFilter, SubOpts, StateAcc)
|
||||||
|
end, State, parse_topic_filters(TopicFilters)),
|
||||||
|
{ok, NState};
|
||||||
|
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
?LOG(warning, "Unexpected info ~p", [Info]),
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
negotiate_version(undefined) ->
|
negotiate_version(undefined) ->
|
||||||
{ok, <<"1.0">>};
|
{ok, <<"1.0">>};
|
||||||
negotiate_version(Accepts) ->
|
negotiate_version(Accepts) ->
|
||||||
|
@ -312,13 +521,15 @@ negotiate_version(Ver, [AcceptVer|_]) when Ver >= AcceptVer ->
|
||||||
negotiate_version(Ver, [_|T]) ->
|
negotiate_version(Ver, [_|T]) ->
|
||||||
negotiate_version(Ver, T).
|
negotiate_version(Ver, T).
|
||||||
|
|
||||||
check_login(undefined, _, AllowAnonymous, _) ->
|
check_login(Login, _, AllowAnonymous, _)
|
||||||
|
when Login == <<>>;
|
||||||
|
Login == undefined ->
|
||||||
AllowAnonymous;
|
AllowAnonymous;
|
||||||
check_login(_, _, _, undefined) ->
|
check_login(_, _, _, undefined) ->
|
||||||
false;
|
false;
|
||||||
check_login(Login, Passcode, _, DefaultUser) ->
|
check_login(Login, Passcode, _, DefaultUser) ->
|
||||||
case {list_to_binary(get_value(login, DefaultUser)),
|
case {iolist_to_binary(get_value(login, DefaultUser)),
|
||||||
list_to_binary(get_value(passcode, DefaultUser))} of
|
iolist_to_binary(get_value(passcode, DefaultUser))} of
|
||||||
{Login, Passcode} -> true;
|
{Login, Passcode} -> true;
|
||||||
{_, _ } -> false
|
{_, _ } -> false
|
||||||
end.
|
end.
|
||||||
|
@ -396,11 +607,18 @@ receipt_id(Headers) ->
|
||||||
|
|
||||||
handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, State) ->
|
handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, State) ->
|
||||||
Topic = header(<<"destination">>, Headers),
|
Topic = header(<<"destination">>, Headers),
|
||||||
|
case check_acl(publish, Topic, State) of
|
||||||
|
allow ->
|
||||||
_ = maybe_send_receipt(receipt_id(Headers), State),
|
_ = maybe_send_receipt(receipt_id(Headers), State),
|
||||||
_ = emqx_broker:publish(
|
_ = emqx_broker:publish(
|
||||||
make_mqtt_message(Topic, Headers, iolist_to_binary(Body))
|
make_mqtt_message(Topic, Headers, iolist_to_binary(Body))
|
||||||
),
|
),
|
||||||
State.
|
State;
|
||||||
|
deny ->
|
||||||
|
ErrFrame = error_frame(receipt_id(Headers), <<"Not Authorized">>),
|
||||||
|
{ok, NState} = send(ErrFrame, State),
|
||||||
|
NState
|
||||||
|
end.
|
||||||
|
|
||||||
handle_recv_ack_frame(#stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
handle_recv_ack_frame(#stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
||||||
Id = header(<<"id">>, Headers),
|
Id = header(<<"id">>, Headers),
|
||||||
|
@ -431,7 +649,111 @@ reverse_heartbeats({Cx, Cy}) ->
|
||||||
start_heartbeart_timer(Heartbeats, State) ->
|
start_heartbeart_timer(Heartbeats, State) ->
|
||||||
ensure_timer(
|
ensure_timer(
|
||||||
[incoming_timer, outgoing_timer],
|
[incoming_timer, outgoing_timer],
|
||||||
State#pstate{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}).
|
State#pstate{heart_beats = emqx_stomp_heartbeat:init(backoff(Heartbeats))}).
|
||||||
|
|
||||||
|
backoff({Cx, Cy}) ->
|
||||||
|
{erlang:ceil(Cx * ?INCOMING_TIMER_BACKOFF),
|
||||||
|
erlang:ceil(Cy * ?OUTCOMING_TIMER_BACKOFF)}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% pub & sub helpers
|
||||||
|
|
||||||
|
parse_topic_filters(TopicFilters) ->
|
||||||
|
lists:map(fun emqx_topic:parse/1, TopicFilters).
|
||||||
|
|
||||||
|
check_acl(PubSub, Topic, State = #pstate{clientinfo = ClientInfo}) ->
|
||||||
|
case is_acl_enabled(State) andalso
|
||||||
|
emqx_access_control:check_acl(ClientInfo, PubSub, Topic) of
|
||||||
|
false -> allow;
|
||||||
|
Res -> Res
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_subscribe(TopicFilter, SubOpts,
|
||||||
|
State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) ->
|
||||||
|
ClientId = maps:get(clientid, ClientInfo),
|
||||||
|
_ = emqx_broker:subscribe(TopicFilter, ClientId),
|
||||||
|
NSubOpts = SubOpts#{is_new => true},
|
||||||
|
_ = run_hooks('session.subscribed',
|
||||||
|
[ClientInfo, TopicFilter, NSubOpts]),
|
||||||
|
send_event_to_self(updated),
|
||||||
|
State#pstate{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}.
|
||||||
|
|
||||||
|
do_unsubscribe(TopicFilter, SubOpts,
|
||||||
|
State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) ->
|
||||||
|
ok = emqx_broker:unsubscribe(TopicFilter),
|
||||||
|
_ = run_hooks('session.unsubscribe',
|
||||||
|
[ClientInfo, TopicFilter, SubOpts]),
|
||||||
|
send_event_to_self(updated),
|
||||||
|
State#pstate{subscriptions = maps:remove(TopicFilter, Subs)}.
|
||||||
|
|
||||||
|
find_sub_by_topic(Topic, Subs) ->
|
||||||
|
case maps:get(Topic, Subs, undefined) of
|
||||||
|
undefined -> undefined;
|
||||||
|
SubOpts -> {Topic, SubOpts}
|
||||||
|
end.
|
||||||
|
|
||||||
|
find_sub_by_id(Id, Subs) ->
|
||||||
|
Found = maps:filter(fun(_, SubOpts) ->
|
||||||
|
%% FIXME: datatype??
|
||||||
|
maps:get(id, maps:get(sub_props, SubOpts, #{}), -1) == Id
|
||||||
|
end, Subs),
|
||||||
|
case maps:to_list(Found) of
|
||||||
|
[] -> undefined;
|
||||||
|
[Sub|_] -> Sub
|
||||||
|
end.
|
||||||
|
|
||||||
|
is_acl_enabled(_) ->
|
||||||
|
%% TODO: configs from somewhere
|
||||||
|
true.
|
||||||
|
|
||||||
|
%% automaticly fill the next sub-id and ack if sub-id is absent
|
||||||
|
enrich_sub_opts(SubOpts0, Subs) ->
|
||||||
|
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
|
||||||
|
SubProps = maps:get(sub_props, SubOpts, #{}),
|
||||||
|
SubOpts#{sub_props =>
|
||||||
|
maps:merge(#{id => next_sub_id(Subs),
|
||||||
|
ack => ?DEFAULT_SUB_ACK}, SubProps)}.
|
||||||
|
|
||||||
|
next_sub_id(Subs) ->
|
||||||
|
Ids = maps:fold(fun(_, SubOpts, Acc) ->
|
||||||
|
[binary_to_integer(
|
||||||
|
maps:get(id, maps:get(sub_props, SubOpts, #{}), <<"0">>)) | Acc]
|
||||||
|
end, [], Subs),
|
||||||
|
integer_to_binary(lists:max(Ids) + 1).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% helpers
|
||||||
|
|
||||||
|
default_user(#pstate{default_user = DefaultUser}) ->
|
||||||
|
DefaultUser.
|
||||||
|
allow_anonymous(#pstate{allow_anonymous = AllowAnonymous}) ->
|
||||||
|
AllowAnonymous.
|
||||||
|
|
||||||
|
ensure_connected(State = #pstate{conninfo = ConnInfo,
|
||||||
|
clientinfo = ClientInfo}) ->
|
||||||
|
NConnInfo = ConnInfo#{
|
||||||
|
connected => true,
|
||||||
|
connected_at => erlang:system_time(millisecond)
|
||||||
|
},
|
||||||
|
send_event_to_self(connected),
|
||||||
|
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
|
||||||
|
State#pstate{conninfo = NConnInfo,
|
||||||
|
connected = true
|
||||||
|
}.
|
||||||
|
|
||||||
|
ensure_disconnected(Reason, State = #pstate{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
|
||||||
|
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
|
||||||
|
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
|
||||||
|
State#pstate{conninfo = NConnInfo, connected = false}.
|
||||||
|
|
||||||
|
send_event_to_self(Name) ->
|
||||||
|
self() ! {event, Name}, ok.
|
||||||
|
|
||||||
|
run_hooks(Name, Args) ->
|
||||||
|
emqx_hooks:run(Name, Args).
|
||||||
|
|
||||||
|
run_hooks(Name, Args, Acc) ->
|
||||||
|
emqx_hooks:run_fold(Name, Args, Acc).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Timer
|
%% Timer
|
||||||
|
@ -466,3 +788,4 @@ interval(outgoing_timer, #pstate{heart_beats = HrtBt}) ->
|
||||||
emqx_stomp_heartbeat:interval(outgoing, HrtBt);
|
emqx_stomp_heartbeat:interval(outgoing, HrtBt);
|
||||||
interval(clean_trans_timer, _) ->
|
interval(clean_trans_timer, _) ->
|
||||||
?TRANS_TIMEOUT.
|
?TRANS_TIMEOUT.
|
||||||
|
|
||||||
|
|
|
@ -100,7 +100,7 @@ t_heartbeat(_) ->
|
||||||
{<<"host">>, <<"127.0.0.1:61613">>},
|
{<<"host">>, <<"127.0.0.1:61613">>},
|
||||||
{<<"login">>, <<"guest">>},
|
{<<"login">>, <<"guest">>},
|
||||||
{<<"passcode">>, <<"guest">>},
|
{<<"passcode">>, <<"guest">>},
|
||||||
{<<"heart-beat">>, <<"1000,800">>}])),
|
{<<"heart-beat">>, <<"1000,2000">>}])),
|
||||||
{ok, Data} = gen_tcp:recv(Sock, 0),
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||||
{ok, #stomp_frame{command = <<"CONNECTED">>,
|
{ok, #stomp_frame{command = <<"CONNECTED">>,
|
||||||
headers = _,
|
headers = _,
|
||||||
|
|
|
@ -35,8 +35,7 @@ t_check_1(_) ->
|
||||||
{ok, HrtBt1} = emqx_stomp_heartbeat:check(incoming, 0, HrtBt),
|
{ok, HrtBt1} = emqx_stomp_heartbeat:check(incoming, 0, HrtBt),
|
||||||
{error, timeout} = emqx_stomp_heartbeat:check(incoming, 0, HrtBt1),
|
{error, timeout} = emqx_stomp_heartbeat:check(incoming, 0, HrtBt1),
|
||||||
|
|
||||||
{ok, HrtBt2} = emqx_stomp_heartbeat:check(outgoing, 0, HrtBt1),
|
{error, timeout} = emqx_stomp_heartbeat:check(outgoing, 0, HrtBt1),
|
||||||
{error, timeout} = emqx_stomp_heartbeat:check(outgoing, 0, HrtBt2),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_check_2(_) ->
|
t_check_2(_) ->
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_web_hook,
|
{application, emqx_web_hook,
|
||||||
[{description, "EMQ X WebHook Plugin"},
|
[{description, "EMQ X WebHook Plugin"},
|
||||||
{vsn, "4.3.6"}, % strict semver, bump manually!
|
{vsn, "4.3.7"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_web_hook_sup]},
|
{registered, [emqx_web_hook_sup]},
|
||||||
{applications, [kernel,stdlib,ehttpc]},
|
{applications, [kernel,stdlib,ehttpc]},
|
||||||
|
|
2
bin/emqx
2
bin/emqx
|
@ -27,7 +27,6 @@ export EMU="beam"
|
||||||
export PROGNAME="erl"
|
export PROGNAME="erl"
|
||||||
DYNLIBS_DIR="$RUNNER_ROOT_DIR/dynlibs"
|
DYNLIBS_DIR="$RUNNER_ROOT_DIR/dynlibs"
|
||||||
ERTS_LIB_DIR="$ERTS_DIR/../lib"
|
ERTS_LIB_DIR="$ERTS_DIR/../lib"
|
||||||
MNESIA_DATA_DIR="$RUNNER_DATA_DIR/mnesia/$NAME"
|
|
||||||
|
|
||||||
# Echo to stderr on errors
|
# Echo to stderr on errors
|
||||||
echoerr() { echo "$*" 1>&2; }
|
echoerr() { echo "$*" 1>&2; }
|
||||||
|
@ -409,6 +408,7 @@ case $NAME in
|
||||||
NAME=$NAME@$(relx_get_nodename)
|
NAME=$NAME@$(relx_get_nodename)
|
||||||
;;
|
;;
|
||||||
esac
|
esac
|
||||||
|
MNESIA_DATA_DIR="$RUNNER_DATA_DIR/mnesia/$NAME"
|
||||||
|
|
||||||
# Check the first argument for instructions
|
# Check the first argument for instructions
|
||||||
case "$1" in
|
case "$1" in
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_dashboard,
|
{application, emqx_dashboard,
|
||||||
[{description, "EMQ X Web Dashboard"},
|
[{description, "EMQ X Web Dashboard"},
|
||||||
{vsn, "4.3.5"}, % strict semver, bump manually!
|
{vsn, "4.3.6"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_dashboard_sup]},
|
{registered, [emqx_dashboard_sup]},
|
||||||
{applications, [kernel,stdlib,mnesia,minirest]},
|
{applications, [kernel,stdlib,mnesia,minirest]},
|
||||||
|
|
|
@ -43,7 +43,7 @@
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
|
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
|
||||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
|
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
|
||||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.3"}}}
|
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.4"}}}
|
||||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.6.0"}}}
|
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.6.0"}}}
|
||||||
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
|
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
|
||||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}}
|
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}}
|
||||||
|
|
|
@ -5,16 +5,16 @@
|
||||||
|
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
|
|
||||||
ELVIS_VERSION='1.0.0-emqx-2'
|
elvis_version='1.0.0-emqx-2'
|
||||||
|
|
||||||
base="${1:-}"
|
base="${1:-}"
|
||||||
|
repo="${2:-emqx/emqx}"
|
||||||
|
REPO="${GITHUB_REPOSITORY:-${repo}}"
|
||||||
if [ "${base}" = "" ]; then
|
if [ "${base}" = "" ]; then
|
||||||
echo "Usage $0 <git-compare-base-ref>"
|
echo "Usage $0 <git-compare-base-ref>"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
elvis_version="${2:-$ELVIS_VERSION}"
|
|
||||||
|
|
||||||
echo "elvis -v: $elvis_version"
|
echo "elvis -v: $elvis_version"
|
||||||
echo "git diff base: $base"
|
echo "git diff base: $base"
|
||||||
|
|
||||||
|
@ -27,11 +27,7 @@ if [[ "$base" =~ [0-9a-f]{8,40} ]]; then
|
||||||
# base is a commit sha1
|
# base is a commit sha1
|
||||||
compare_base="$base"
|
compare_base="$base"
|
||||||
else
|
else
|
||||||
if [[ $CI == true ]];then
|
remote="$(git remote -v | grep -E "github\.com(:|/)$REPO((\.git)|(\s))" | grep fetch | awk '{print $1}')"
|
||||||
remote="$(git remote -v | grep -E "github\.com(.|/)$GITHUB_REPOSITORY" | grep fetch | awk '{print $1}')"
|
|
||||||
else
|
|
||||||
remote="$(git remote -v | grep -E 'github\.com(.|/)emqx' | grep fetch | awk '{print $1}')"
|
|
||||||
fi
|
|
||||||
git fetch "$remote" "$base"
|
git fetch "$remote" "$base"
|
||||||
compare_base="$remote/$base"
|
compare_base="$remote/$base"
|
||||||
fi
|
fi
|
||||||
|
@ -58,3 +54,31 @@ if [ $bad_file_count -gt 0 ]; then
|
||||||
echo "elvis: $bad_file_count errors"
|
echo "elvis: $bad_file_count errors"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
### now check new-line at EOF for changed files
|
||||||
|
|
||||||
|
nl_at_eof() {
|
||||||
|
local file="$1"
|
||||||
|
if ! [ -f "$file" ]; then
|
||||||
|
return
|
||||||
|
fi
|
||||||
|
case "$file" in
|
||||||
|
*.png|*rebar3)
|
||||||
|
return
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
local lastbyte
|
||||||
|
lastbyte="$(tail -c 1 "$file" 2>&1)"
|
||||||
|
if [ "$lastbyte" != '' ]; then
|
||||||
|
echo "$file"
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
for file in $(git_diff); do
|
||||||
|
if ! nl_at_eof "$file"; then
|
||||||
|
bad_file_count=$(( bad_file_count + 1 ))
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
exit $bad_file_count
|
||||||
|
|
|
@ -21,12 +21,11 @@ support other repos too.
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
|
|
||||||
update_appup.escript [--check] [--repo URL] [--remote NAME] [--skip-build] [--make-commad SCRIPT] [--release-dir DIR] <current_release_tag>
|
update_appup.escript [--check] [--repo URL] [--remote NAME] [--skip-build] [--make-commad SCRIPT] [--release-dir DIR] <previous_release_tag>
|
||||||
|
|
||||||
Options:
|
Options:
|
||||||
|
|
||||||
--check Don't update the appfile, just check that they are complete
|
--check Don't update the appfile, just check that they are complete
|
||||||
--prev-tag Specify the previous release tag. Otherwise the previous patch version is used
|
|
||||||
--repo Upsteam git repo URL
|
--repo Upsteam git repo URL
|
||||||
--remote Get upstream repo URL from the specified git remote
|
--remote Get upstream repo URL from the specified git remote
|
||||||
--skip-build Don't rebuild the releases. May produce wrong results
|
--skip-build Don't rebuild the releases. May produce wrong results
|
||||||
|
@ -34,7 +33,7 @@ Options:
|
||||||
--release-dir Release directory
|
--release-dir Release directory
|
||||||
--src-dirs Directories where source code is found. Defaults to '{src,apps,lib-*}/**/'
|
--src-dirs Directories where source code is found. Defaults to '{src,apps,lib-*}/**/'
|
||||||
--binary-rel-url Binary release URL pattern. %TAG% variable is substituted with the release tag.
|
--binary-rel-url Binary release URL pattern. %TAG% variable is substituted with the release tag.
|
||||||
E.g. \"https://github.com/emqx/emqx/releases/download/v4.3.8/emqx-centos7-%TAG%-amd64.zip\"
|
E.g. \"https://github.com/emqx/emqx/releases/download/v%TAG%/emqx-centos7-%TAG%-amd64.zip\"
|
||||||
".
|
".
|
||||||
|
|
||||||
-record(app,
|
-record(app,
|
||||||
|
@ -60,18 +59,12 @@ ignored_apps() ->
|
||||||
[emqx_dashboard, emqx_management] ++ otp_standard_apps().
|
[emqx_dashboard, emqx_management] ++ otp_standard_apps().
|
||||||
|
|
||||||
main(Args) ->
|
main(Args) ->
|
||||||
#{current_release := CurrentRelease} = Options = parse_args(Args, default_options()),
|
#{prev_tag := Baseline} = Options = parse_args(Args, default_options()),
|
||||||
init_globals(Options),
|
init_globals(Options),
|
||||||
case find_prev_tag(CurrentRelease) of
|
main(Options, Baseline).
|
||||||
{ok, Baseline} ->
|
|
||||||
main(Options, Baseline);
|
|
||||||
undefined ->
|
|
||||||
log("No appup update is needed for this release, nothing to be done~n", []),
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
parse_args([CurrentRelease = [A|_]], State) when A =/= $- ->
|
parse_args([PrevTag = [A|_]], State) when A =/= $- ->
|
||||||
State#{current_release => CurrentRelease};
|
State#{prev_tag => PrevTag};
|
||||||
parse_args(["--check"|Rest], State) ->
|
parse_args(["--check"|Rest], State) ->
|
||||||
parse_args(Rest, State#{check => true});
|
parse_args(Rest, State#{check => true});
|
||||||
parse_args(["--skip-build"|Rest], State) ->
|
parse_args(["--skip-build"|Rest], State) ->
|
||||||
|
@ -86,8 +79,6 @@ parse_args(["--release-dir", Dir|Rest], State) ->
|
||||||
parse_args(Rest, State#{beams_dir => Dir});
|
parse_args(Rest, State#{beams_dir => Dir});
|
||||||
parse_args(["--src-dirs", Pattern|Rest], State) ->
|
parse_args(["--src-dirs", Pattern|Rest], State) ->
|
||||||
parse_args(Rest, State#{src_dirs => Pattern});
|
parse_args(Rest, State#{src_dirs => Pattern});
|
||||||
parse_args(["--prev-tag", Tag|Rest], State) ->
|
|
||||||
parse_args(Rest, State#{prev_tag => Tag});
|
|
||||||
parse_args(["--binary-rel-url", URL|Rest], State) ->
|
parse_args(["--binary-rel-url", URL|Rest], State) ->
|
||||||
parse_args(Rest, State#{binary_rel_url => {ok, URL}});
|
parse_args(Rest, State#{binary_rel_url => {ok, URL}});
|
||||||
parse_args(_, _) ->
|
parse_args(_, _) ->
|
||||||
|
@ -163,7 +154,7 @@ download_prev_release(Tag, #{binary_rel_url := {ok, URL0}, clone_url := Repo}) -
|
||||||
Dir = filename:basename(Repo, ".git") ++ [$-|Tag],
|
Dir = filename:basename(Repo, ".git") ++ [$-|Tag],
|
||||||
Filename = filename:join(BaseDir, Dir),
|
Filename = filename:join(BaseDir, Dir),
|
||||||
Script = "mkdir -p ${OUTFILE} &&
|
Script = "mkdir -p ${OUTFILE} &&
|
||||||
{ [ -f ${OUTFILE}.zip ] || wget -O ${OUTFILE}.zip ${URL}; } &&
|
wget -O ${OUTFILE}.zip ${URL} &&
|
||||||
unzip -n -d ${OUTFILE} ${OUTFILE}.zip",
|
unzip -n -d ${OUTFILE} ${OUTFILE}.zip",
|
||||||
Env = [{"TAG", Tag}, {"OUTFILE", Filename}, {"URL", URL}],
|
Env = [{"TAG", Tag}, {"OUTFILE", Filename}, {"URL", URL}],
|
||||||
bash(Script, Env),
|
bash(Script, Env),
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
{application, emqx,
|
{application, emqx,
|
||||||
[{id, "emqx"},
|
[{id, "emqx"},
|
||||||
{description, "EMQ X"},
|
{description, "EMQ X"},
|
||||||
{vsn, "4.3.10"}, % strict semver, bump manually!
|
{vsn, "4.3.11"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},
|
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.9",
|
[{"4.3.10",[{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{"4.3.9",
|
||||||
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
@ -10,7 +12,8 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.8",
|
{"4.3.8",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
@ -19,7 +22,8 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.7",
|
{"4.3.7",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
|
@ -30,7 +34,8 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.6",
|
{"4.3.6",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
|
@ -42,7 +47,8 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.5",
|
{"4.3.5",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
|
@ -55,7 +61,8 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
|
@ -69,7 +76,8 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||||
|
@ -147,8 +155,10 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.9",
|
[{"4.3.10",[{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{"4.3.9",
|
||||||
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
@ -157,7 +167,8 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.8",
|
{"4.3.8",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
@ -166,7 +177,8 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.7",
|
{"4.3.7",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
@ -177,7 +189,8 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.6",
|
{"4.3.6",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
@ -189,7 +202,8 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.5",
|
{"4.3.5",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
|
@ -202,7 +216,8 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
|
@ -216,7 +231,8 @@
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -702,7 +702,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
|
||||||
ok = emqx_metrics:inc('bytes.sent', Oct),
|
ok = emqx_metrics:inc('bytes.sent', Oct),
|
||||||
inc_counter(outgoing_bytes, Oct),
|
inc_counter(outgoing_bytes, Oct),
|
||||||
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
|
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
|
||||||
case Transport:async_send(Socket, IoData, [nosuspend]) of
|
case Transport:async_send(Socket, IoData, []) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
Error = {error, _Reason} ->
|
Error = {error, _Reason} ->
|
||||||
%% Send an inet_reply to postpone handling the error
|
%% Send an inet_reply to postpone handling the error
|
||||||
|
|
|
@ -94,7 +94,10 @@
|
||||||
-type(ver() :: ?MQTT_PROTO_V3
|
-type(ver() :: ?MQTT_PROTO_V3
|
||||||
| ?MQTT_PROTO_V4
|
| ?MQTT_PROTO_V4
|
||||||
| ?MQTT_PROTO_V5
|
| ?MQTT_PROTO_V5
|
||||||
| non_neg_integer()).
|
| non_neg_integer()
|
||||||
|
%% Some non-MQTT versions of protocol may be a binary type
|
||||||
|
| binary()
|
||||||
|
).
|
||||||
|
|
||||||
-type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2).
|
-type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2).
|
||||||
-type(qos_name() :: qos0 | at_most_once |
|
-type(qos_name() :: qos0 | at_most_once |
|
||||||
|
|
Loading…
Reference in New Issue