Merge remote-tracking branch 'origin/main-v4.3' into main-v4.4
This commit is contained in:
commit
71299a2bcf
|
@ -133,7 +133,7 @@ jobs:
|
||||||
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
|
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
|
||||||
- uses: actions/upload-artifact@v3
|
- uses: actions/upload-artifact@v3
|
||||||
with:
|
with:
|
||||||
name: ${{ env.EMQX_NAME }}-${{ matrix.otp }}
|
name: ${{ env.EMQX_NAME }}
|
||||||
path: _packages/${{ env.EMQX_NAME }}/
|
path: _packages/${{ env.EMQX_NAME }}/
|
||||||
|
|
||||||
linux:
|
linux:
|
||||||
|
|
|
@ -36,9 +36,7 @@ jobs:
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: AutoModality/action-clean@v1
|
- uses: AutoModality/action-clean@v1
|
||||||
# keep using v1 for now as the otp-23 image has an old version git
|
- uses: actions/checkout@v3
|
||||||
# TODO: change to v3 after OTP is upgraded to 23.3.4.18-1
|
|
||||||
- uses: actions/checkout@v1
|
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0 # clone full git history
|
fetch-depth: 0 # clone full git history
|
||||||
- name: fix-git-unsafe-repository
|
- name: fix-git-unsafe-repository
|
||||||
|
|
|
@ -67,7 +67,7 @@ check_acl(ClientInfo = #{jwt_claims := Claims},
|
||||||
case is_expired(Exp) of
|
case is_expired(Exp) of
|
||||||
true ->
|
true ->
|
||||||
?DEBUG("acl_deny_due_to_jwt_expired", []),
|
?DEBUG("acl_deny_due_to_jwt_expired", []),
|
||||||
deny;
|
{stop, deny};
|
||||||
false ->
|
false ->
|
||||||
verify_acl(ClientInfo, Acl, PubSub, Topic)
|
verify_acl(ClientInfo, Acl, PubSub, Topic)
|
||||||
end;
|
end;
|
||||||
|
|
|
@ -462,6 +462,16 @@ t_check_jwt_acl_expire(_Config) ->
|
||||||
{ok, #{}, [?RC_NOT_AUTHORIZED]},
|
{ok, #{}, [?RC_NOT_AUTHORIZED]},
|
||||||
emqtt:subscribe(C, <<"a/b">>, 0)),
|
emqtt:subscribe(C, <<"a/b">>, 0)),
|
||||||
|
|
||||||
|
Default = emqx_zone:get_env(external, acl_nomatch, deny),
|
||||||
|
emqx_zone:set_env(external, acl_nomatch, allow),
|
||||||
|
try
|
||||||
|
?assertMatch(
|
||||||
|
{ok, #{}, [?RC_NOT_AUTHORIZED]},
|
||||||
|
emqtt:subscribe(C, <<"a/b">>, 0))
|
||||||
|
after
|
||||||
|
emqx_zone:set_env(external, acl_nomatch, Default)
|
||||||
|
end,
|
||||||
|
|
||||||
ok = emqtt:disconnect(C).
|
ok = emqtt:disconnect(C).
|
||||||
|
|
||||||
t_check_jwt_acl_no_exp(init, _Config) ->
|
t_check_jwt_acl_no_exp(init, _Config) ->
|
||||||
|
|
10
bin/emqx
10
bin/emqx
|
@ -324,7 +324,7 @@ relx_rem_sh() {
|
||||||
|
|
||||||
# Generate a random id
|
# Generate a random id
|
||||||
relx_gen_id() {
|
relx_gen_id() {
|
||||||
od -t x -N 4 /dev/urandom | head -n1 | awk '{print $2}'
|
od -t u -N 4 /dev/urandom | head -n1 | awk '{print $2 % 1000}'
|
||||||
}
|
}
|
||||||
|
|
||||||
# Control a node
|
# Control a node
|
||||||
|
@ -579,9 +579,13 @@ if [ -z "$COOKIE" ]; then
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ -z "$COOKIE" ]; then
|
[ -z "$COOKIE" ] && COOKIE="$EMQX_DEFAULT_ERLANG_COOKIE"
|
||||||
|
if [ $IS_BOOT_COMMAND = 'yes' ] && [ "$COOKIE" = "$EMQX_DEFAULT_ERLANG_COOKIE" ]; then
|
||||||
|
echoerr "!!!!!!"
|
||||||
|
echoerr "WARNING: Default (insecure) Erlang cookie is in use."
|
||||||
echoerr "Please set node.cookie in $RUNNER_ETC_DIR/emqx.conf or override from environment variable EMQX_NODE_COOKIE"
|
echoerr "Please set node.cookie in $RUNNER_ETC_DIR/emqx.conf or override from environment variable EMQX_NODE_COOKIE"
|
||||||
exit 1
|
echoerr "NOTE: Use the same config value for all nodes in the cluster."
|
||||||
|
echoerr "!!!!!!"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
cd "$ROOTDIR"
|
cd "$ROOTDIR"
|
||||||
|
|
|
@ -202,7 +202,12 @@ nodename(Name) ->
|
||||||
|
|
||||||
this_node_name(Name) ->
|
this_node_name(Name) ->
|
||||||
[Node, Host] = re:split(Name, "@", [{return, list}, unicode]),
|
[Node, Host] = re:split(Name, "@", [{return, list}, unicode]),
|
||||||
list_to_atom(lists:concat(["remsh_maint_", Node, os:getpid(), "@", Host])).
|
list_to_atom(lists:concat(["remsh_maint_", Node, node_name_suffix_id(), "@", Host])).
|
||||||
|
|
||||||
|
%% use the reversed value that from pid mod 1000 as the node name suffix
|
||||||
|
node_name_suffix_id() ->
|
||||||
|
Pid = os:getpid(),
|
||||||
|
string:slice(string:reverse(Pid), 0, 3).
|
||||||
|
|
||||||
%% For windows???
|
%% For windows???
|
||||||
create_mnesia_dir(DataDir, NodeName) ->
|
create_mnesia_dir(DataDir, NodeName) ->
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
# v4.3.23
|
||||||
|
|
||||||
|
## Enhancements
|
||||||
|
|
||||||
|
- Added topic validation for `emqx_mod_rewrite`. The dest topics contains wildcards are not allowed to publish [#9359](https://github.com/emqx/emqx/issues/9359).
|
||||||
|
|
||||||
|
- Print a warning message when boot with the default (insecure) Erlang cookie [#9340](https://github.com/emqx/emqx/pull/9340).
|
||||||
|
|
||||||
|
- Improve node name generation rules to avoid potential atom table overflow risk [#9391](https://github.com/emqx/emqx/pull/9391).
|
||||||
|
|
||||||
|
## Bug fixes
|
||||||
|
|
||||||
|
- Fix a bug where the JWT ACL would not short-circuit with a deny response when the token is expired [#9338](https://github.com/emqx/emqx/pull/9338).
|
|
@ -0,0 +1,13 @@
|
||||||
|
# v4.3.23
|
||||||
|
|
||||||
|
## 增强
|
||||||
|
|
||||||
|
- 为主题重写模块增加主题合法性检查,带有通配符的目标主题不允许被发布 [#9359](https://github.com/emqx/emqx/issues/9359)。
|
||||||
|
|
||||||
|
- 使用默认的(不安全的) Erlang cookie 进行启动时,将会打印一条警告信息 [#9340](https://github.com/emqx/emqx/pull/9340)。
|
||||||
|
|
||||||
|
- 改进了节点名称生成规则,以避免潜在的原子表溢出风险 [#9391](https://github.com/emqx/emqx/pull/9391)。
|
||||||
|
|
||||||
|
## 修复
|
||||||
|
|
||||||
|
- 修复 JWT ACL 在令牌超期后授权检查不生效的问题 [#9338](https://github.com/emqx/emqx/pull/9338)。
|
|
@ -14,6 +14,10 @@ RUNNER_DATA_DIR="{{ runner_data_dir }}"
|
||||||
RUNNER_USER="{{ runner_user }}"
|
RUNNER_USER="{{ runner_user }}"
|
||||||
EMQX_DESCRIPTION='{{ emqx_description }}'
|
EMQX_DESCRIPTION='{{ emqx_description }}'
|
||||||
|
|
||||||
|
## Do not change EMQX_DEFAULT_ERLANG_COOKIE.
|
||||||
|
## Configure EMQX_NODE_COOKIE instead
|
||||||
|
EMQX_DEFAULT_ERLANG_COOKIE='{{ emqx_default_erlang_cookie }}'
|
||||||
|
|
||||||
## Warning: DO NOT create new variables using the above vars in this file,
|
## Warning: DO NOT create new variables using the above vars in this file,
|
||||||
## as the vars above can be overwritten by the relup scripts later, like:
|
## as the vars above can be overwritten by the relup scripts later, like:
|
||||||
## REL_VSN="new_version"
|
## REL_VSN="new_version"
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([ compile/1
|
-export([ compile_rules/1
|
||||||
, match_and_rewrite/3
|
, match_and_rewrite/3
|
||||||
]).
|
]).
|
||||||
-endif.
|
-endif.
|
||||||
|
@ -40,13 +40,14 @@
|
||||||
, description/0
|
, description/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-type(topic() :: binary()).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Load/Unload
|
%% Load/Unload
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
load(RawRules) ->
|
load(RawRules) ->
|
||||||
{PubRules, SubRules} = compile(RawRules),
|
{PubRules, SubRules} = compile_rules(RawRules),
|
||||||
log_start(RawRules),
|
|
||||||
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, 1000),
|
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, 1000),
|
||||||
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, 1000),
|
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, 1000),
|
||||||
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, 1000).
|
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, 1000).
|
||||||
|
@ -75,30 +76,59 @@ description() ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
log_start(Rules) ->
|
compile_rules(RawRules) ->
|
||||||
PubRules = [{pub, Topic, Re, Dest} || {rewrite, pub, Topic, Re, Dest} <- Rules],
|
compile(validate_rules(RawRules)).
|
||||||
SubRules = [{sub, Topic, Re, Dest} || {rewrite, sub, Topic, Re, Dest} <- Rules],
|
|
||||||
|
compile({PubRules, SubRules}) ->
|
||||||
|
CompileRE =
|
||||||
|
fun({rewrite, RewriteFrom, Re, RewriteTo}) ->
|
||||||
|
{ok, MP} = re:compile(Re),
|
||||||
|
{rewrite, RewriteFrom, MP, RewriteTo}
|
||||||
|
end,
|
||||||
|
{lists:map(CompileRE, PubRules), lists:map(CompileRE, SubRules)}.
|
||||||
|
|
||||||
|
validate_rules(Rules) ->
|
||||||
|
PubRules = [{rewrite, RewriteFrom, Re, RewriteTo} ||
|
||||||
|
{rewrite, pub, RewriteFrom, Re, RewriteTo} <- Rules,
|
||||||
|
validate_rule(pub, RewriteFrom, RewriteTo)
|
||||||
|
],
|
||||||
|
SubRules = [{rewrite, RewriteFrom, Re, RewriteTo} ||
|
||||||
|
{rewrite, sub, RewriteFrom, Re, RewriteTo} <- Rules,
|
||||||
|
validate_rule(sub, RewriteFrom, RewriteTo)
|
||||||
|
],
|
||||||
?LOG(info, "[Rewrite] Load: pub rules count ~p sub rules count ~p",
|
?LOG(info, "[Rewrite] Load: pub rules count ~p sub rules count ~p",
|
||||||
[erlang:length(PubRules), erlang:length(SubRules)]),
|
[erlang:length(PubRules), erlang:length(SubRules)]),
|
||||||
log_rule(PubRules, 1),
|
log_rules(pub, PubRules),
|
||||||
log_rule(SubRules, 1).
|
log_rules(sub, SubRules),
|
||||||
|
{PubRules, SubRules}.
|
||||||
|
|
||||||
log_rule([], _Index) -> ok;
|
validate_rule(Type, RewriteFrom, RewriteTo) ->
|
||||||
log_rule([{Type, Topic, Re, Dest} | Rules], Index) ->
|
case validate_topic(filter, RewriteFrom) of
|
||||||
|
ok ->
|
||||||
|
case validate_topic(dest_topic_type(Type), RewriteTo) of
|
||||||
|
ok ->
|
||||||
|
true;
|
||||||
|
{error, Reason} ->
|
||||||
|
log_invalid_rule(to, Type, RewriteTo, Reason),
|
||||||
|
false
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
log_invalid_rule(from, Type, RewriteFrom, Reason),
|
||||||
|
false
|
||||||
|
end.
|
||||||
|
|
||||||
|
log_invalid_rule(Direction, Type, Topic, Reason) ->
|
||||||
|
?LOG(warning, "Invalid rewrite ~p rule for rewrite ~p topic '~ts' discarded. Reason: ~p",
|
||||||
|
[type_to_name(Type), Direction, Topic, Reason]).
|
||||||
|
|
||||||
|
log_rules(Type, Rules) ->
|
||||||
|
do_log_rules(Type, Rules, 1).
|
||||||
|
|
||||||
|
do_log_rules(_Type, [], _Index) -> ok;
|
||||||
|
do_log_rules(Type, [{_, Topic, Re, Dest} | Rules], Index) ->
|
||||||
?LOG(info, "[Rewrite] Load ~p rule[~p]: source: ~ts, re: ~ts, dest: ~ts",
|
?LOG(info, "[Rewrite] Load ~p rule[~p]: source: ~ts, re: ~ts, dest: ~ts",
|
||||||
[Type, Index, Topic, Re, Dest]),
|
[Type, Index, Topic, Re, Dest]),
|
||||||
log_rule(Rules, Index + 1).
|
do_log_rules(Type, Rules, Index + 1).
|
||||||
|
|
||||||
compile(Rules) ->
|
|
||||||
PubRules = [ begin
|
|
||||||
{ok, MP} = re:compile(Re),
|
|
||||||
{rewrite, Topic, MP, Dest}
|
|
||||||
end || {rewrite, pub, Topic, Re, Dest}<- Rules ],
|
|
||||||
SubRules = [ begin
|
|
||||||
{ok, MP} = re:compile(Re),
|
|
||||||
{rewrite, Topic, MP, Dest}
|
|
||||||
end || {rewrite, sub, Topic, Re, Dest}<- Rules ],
|
|
||||||
{PubRules, SubRules}.
|
|
||||||
|
|
||||||
match_and_rewrite(Topic, [], _) ->
|
match_and_rewrite(Topic, [], _) ->
|
||||||
Topic;
|
Topic;
|
||||||
|
@ -138,3 +168,19 @@ filter_client_binds(Binds) ->
|
||||||
(_) -> true
|
(_) -> true
|
||||||
end,
|
end,
|
||||||
Binds).
|
Binds).
|
||||||
|
|
||||||
|
type_to_name(pub) -> 'PUBLISH';
|
||||||
|
type_to_name(sub) -> 'SUBSCRIBE'.
|
||||||
|
|
||||||
|
dest_topic_type(pub) -> name;
|
||||||
|
dest_topic_type(sub) -> filter.
|
||||||
|
|
||||||
|
-spec(validate_topic(name | filter, topic()) -> ok | {error, term()}).
|
||||||
|
validate_topic(Type, Topic) ->
|
||||||
|
try
|
||||||
|
true = emqx_topic:validate(Type, Topic),
|
||||||
|
ok
|
||||||
|
catch
|
||||||
|
error:Reason ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
|
@ -30,6 +30,13 @@
|
||||||
{rewrite, sub, <<"c/#">>,<<"^c/(.+)$">>,<<"sub/%c/$1">>}
|
{rewrite, sub, <<"c/#">>,<<"^c/(.+)$">>,<<"sub/%c/$1">>}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-define(BAD_RULES_1, [{rewrite, pub, <<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/+">>}]).
|
||||||
|
|
||||||
|
%% empty topic filter/name won't be ranched cased `emqx.conf` will be checked before emqx started
|
||||||
|
%% but we need this check for emqx-ee modules api
|
||||||
|
-define(BAD_RULES_2, [{rewrite, pub, <<"">>,<<"^x/y/(.+)$">>,<<"z/y/+">>}]).
|
||||||
|
-define(BAD_RULES_3, [{rewrite, pub, <<"name/#">>,<<"^name/(.+)$">>,<<"">>}]).
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
@ -82,12 +89,30 @@ t_mod_rewrite(_Config) ->
|
||||||
ok = emqx_mod_rewrite:unload(?RULES).
|
ok = emqx_mod_rewrite:unload(?RULES).
|
||||||
|
|
||||||
t_rewrite_rule(_Config) ->
|
t_rewrite_rule(_Config) ->
|
||||||
{PubRules, SubRules} = emqx_mod_rewrite:compile(?RULES),
|
{PubRules, SubRules} = emqx_mod_rewrite:compile_rules(?RULES),
|
||||||
|
%% assert ordering
|
||||||
|
?assertMatch([{rewrite, <<"x/#">>, _, <<"z/y/$1">>},
|
||||||
|
{rewrite, <<"name/#">>, _, <<"pub/%u/$1">>},
|
||||||
|
{rewrite, <<"c/#">>, _, <<"pub/%c/$1">>}],
|
||||||
|
PubRules),
|
||||||
|
?assertMatch([{rewrite, <<"y/+/z/#">>, _, <<"y/z/$2">>},
|
||||||
|
{rewrite, <<"name/#">>, _, <<"sub/%u/$1">>},
|
||||||
|
{rewrite, <<"c/#">>, _, <<"sub/%c/$1">>}],
|
||||||
|
SubRules),
|
||||||
|
|
||||||
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules, [])),
|
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules, [])),
|
||||||
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules, [])),
|
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules, [])),
|
||||||
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules, [])),
|
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules, [])),
|
||||||
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules, [])).
|
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules, [])).
|
||||||
|
|
||||||
|
t_rewrite_bad_rule_1(_Config) ->
|
||||||
|
?assertEqual({[], []}, emqx_mod_rewrite:compile_rules(?BAD_RULES_1)).
|
||||||
|
|
||||||
|
t_rewrite_bad_rule_2(_Config) ->
|
||||||
|
?assertEqual({[], []}, emqx_mod_rewrite:compile_rules(?BAD_RULES_2)).
|
||||||
|
|
||||||
|
t_rewrite_bad_rule_3(_Config) ->
|
||||||
|
?assertEqual({[], []}, emqx_mod_rewrite:compile_rules(?BAD_RULES_3)).
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -196,18 +196,23 @@ overlay_vars(RelType, PkgType, false) ->
|
||||||
overlay_vars_rel(RelType) ++ overlay_vars_pkg(PkgType).
|
overlay_vars_rel(RelType) ++ overlay_vars_pkg(PkgType).
|
||||||
|
|
||||||
%% vars per release type, cloud or edge
|
%% vars per release type, cloud or edge
|
||||||
overlay_vars_rel(RelType) ->
|
overlay_vars_rel(cloud) ->
|
||||||
VmArgs = case RelType of
|
[ {vm_args_file, "vm.args"}
|
||||||
cloud -> "vm.args";
|
| overlay_vars_rel_common(cloud)
|
||||||
edge -> "vm.args.edge"
|
];
|
||||||
end,
|
overlay_vars_rel(edge) ->
|
||||||
[ {enable_plugin_emqx_rule_engine, RelType =:= cloud}
|
[ {vm_args_file, "vm.args.edge"}
|
||||||
|
| overlay_vars_rel_common(edge)
|
||||||
|
].
|
||||||
|
|
||||||
|
overlay_vars_rel_common(RelType) ->
|
||||||
|
[ {emqx_default_erlang_cookie, "emqxsecretcookie"}
|
||||||
|
, {enable_plugin_emqx_rule_engine, RelType =:= cloud}
|
||||||
, {enable_plugin_emqx_bridge_mqtt, RelType =:= edge}
|
, {enable_plugin_emqx_bridge_mqtt, RelType =:= edge}
|
||||||
, {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce
|
, {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce
|
||||||
, {enable_plugin_emqx_recon, true}
|
, {enable_plugin_emqx_recon, true}
|
||||||
, {enable_plugin_emqx_retainer, true}
|
, {enable_plugin_emqx_retainer, true}
|
||||||
, {enable_plugin_emqx_telemetry, true}
|
, {enable_plugin_emqx_telemetry, true}
|
||||||
, {vm_args_file, VmArgs}
|
|
||||||
].
|
].
|
||||||
|
|
||||||
%% vars per packaging type, bin(zip/tar.gz/docker) or pkg(rpm/deb)
|
%% vars per packaging type, bin(zip/tar.gz/docker) or pkg(rpm/deb)
|
||||||
|
|
|
@ -669,7 +669,6 @@ after_message_acked(ClientInfo, Msg, PubAckProps) ->
|
||||||
%% Process Subscribe
|
%% Process Subscribe
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-compile({inline, [process_subscribe/3]}).
|
|
||||||
process_subscribe(TopicFilters, SubProps, Channel) ->
|
process_subscribe(TopicFilters, SubProps, Channel) ->
|
||||||
process_subscribe(TopicFilters, SubProps, Channel, []).
|
process_subscribe(TopicFilters, SubProps, Channel, []).
|
||||||
|
|
||||||
|
|
|
@ -324,11 +324,17 @@ t_handle_in_pubcomp_not_found_error(_) ->
|
||||||
t_handle_in_subscribe(_) ->
|
t_handle_in_subscribe(_) ->
|
||||||
ok = meck:expect(emqx_session, subscribe,
|
ok = meck:expect(emqx_session, subscribe,
|
||||||
fun(_, _, _, Session) -> {ok, Session} end),
|
fun(_, _, _, Session) -> {ok, Session} end),
|
||||||
|
meck:new(emqx_mqtt_caps),
|
||||||
|
ok = meck:expect(emqx_mqtt_caps, check_sub, fun(_, _, _) -> ok end),
|
||||||
|
try
|
||||||
Channel = channel(#{conn_state => connected}),
|
Channel = channel(#{conn_state => connected}),
|
||||||
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
|
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
|
||||||
Subscribe = ?SUBSCRIBE_PACKET(1, #{}, TopicFilters),
|
Subscribe = ?SUBSCRIBE_PACKET(1, #{}, TopicFilters),
|
||||||
Replies = [{outgoing, ?SUBACK_PACKET(1, [?QOS_0])}, {event, updated}],
|
Replies = [{outgoing, ?SUBACK_PACKET(1, [?QOS_0])}, {event, updated}],
|
||||||
{ok, Replies, _Chan} = emqx_channel:handle_in(Subscribe, Channel).
|
{ok, Replies, _Chan} = emqx_channel:handle_in(Subscribe, Channel)
|
||||||
|
after
|
||||||
|
meck:unload(emqx_mqtt_caps)
|
||||||
|
end.
|
||||||
|
|
||||||
t_handle_in_unsubscribe(_) ->
|
t_handle_in_unsubscribe(_) ->
|
||||||
ok = meck:expect(emqx_session, unsubscribe,
|
ok = meck:expect(emqx_session, unsubscribe,
|
||||||
|
@ -397,9 +403,15 @@ t_process_publish_qos1(_) ->
|
||||||
|
|
||||||
t_process_subscribe(_) ->
|
t_process_subscribe(_) ->
|
||||||
ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
|
ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
|
||||||
|
meck:new(emqx_mqtt_caps),
|
||||||
|
ok = meck:expect(emqx_mqtt_caps, check_sub, fun(_, _, _) -> ok end),
|
||||||
|
try
|
||||||
TopicFilters = [ TopicFilter = {<<"+">>, ?DEFAULT_SUBOPTS}],
|
TopicFilters = [ TopicFilter = {<<"+">>, ?DEFAULT_SUBOPTS}],
|
||||||
{[{TopicFilter, ?RC_SUCCESS}], _Channel} =
|
{[{TopicFilter, ?RC_SUCCESS}], _Channel} =
|
||||||
emqx_channel:process_subscribe(TopicFilters, #{}, channel()).
|
emqx_channel:process_subscribe(TopicFilters, #{}, channel())
|
||||||
|
after
|
||||||
|
meck:unload(emqx_mqtt_caps)
|
||||||
|
end.
|
||||||
|
|
||||||
t_process_unsubscribe(_) ->
|
t_process_unsubscribe(_) ->
|
||||||
ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end),
|
ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end),
|
||||||
|
|
Loading…
Reference in New Issue