commit
938607baca
|
@ -20,6 +20,8 @@ File format:
|
||||||
- Fixed crash when shared persistent subscription [#8441]
|
- Fixed crash when shared persistent subscription [#8441]
|
||||||
- Fixed issue in Lua hook that prevented messages from being
|
- Fixed issue in Lua hook that prevented messages from being
|
||||||
rejected [#8535]
|
rejected [#8535]
|
||||||
|
- Fix ExProto UDP client keepalive checking error.
|
||||||
|
This causes the clients to not expire as long as a new UDP packet arrives [#8575]
|
||||||
|
|
||||||
### Enhancements
|
### Enhancements
|
||||||
- HTTP API(GET /rules/) support for pagination and fuzzy filtering. [#8450]
|
- HTTP API(GET /rules/) support for pagination and fuzzy filtering. [#8450]
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_exproto,
|
{application, emqx_exproto,
|
||||||
[{description, "EMQ X Extension for Protocol"},
|
[{description, "EMQ X Extension for Protocol"},
|
||||||
{vsn, "4.3.8"}, %% 4.3.3 is used by ee
|
{vsn, "4.3.9"}, %% 4.3.3 is used by ee
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_exproto_app, []}},
|
{mod, {emqx_exproto_app, []}},
|
||||||
|
|
|
@ -1,9 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{<<"4\\.3\\.[6-7]">>,
|
[{<<"4\\.3\\.[2-8]">>,
|
||||||
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
|
||||||
{<<"4\\.3\\.[2-5]">>,
|
|
||||||
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.[0-1]">>,
|
{<<"4\\.3\\.[0-1]">>,
|
||||||
|
@ -12,9 +10,7 @@
|
||||||
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{<<"4\\.3\\.[6-7]">>,
|
[{<<"4\\.3\\.[2-8]">>,
|
||||||
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
|
||||||
{<<"4\\.3\\.[2-5]">>,
|
|
||||||
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.[0-1]">>,
|
{<<"4\\.3\\.[0-1]">>,
|
||||||
|
|
|
@ -260,7 +260,8 @@ handle_timeout(_TRef, {keepalive, StatVal},
|
||||||
{ok, reset_timer(alive_timer, NChannel)};
|
{ok, reset_timer(alive_timer, NChannel)};
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
Req = #{type => 'KEEPALIVE'},
|
Req = #{type => 'KEEPALIVE'},
|
||||||
{ok, try_dispatch(on_timer_timeout, wrap(Req), Channel)}
|
NChannel = clean_timer(alive_timer, Channel),
|
||||||
|
{ok, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
|
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
|
||||||
|
@ -327,7 +328,7 @@ handle_call({start_timer, keepalive, Interval},
|
||||||
NConnInfo = ConnInfo#{keepalive => Interval},
|
NConnInfo = ConnInfo#{keepalive => Interval},
|
||||||
NClientInfo = ClientInfo#{keepalive => Interval},
|
NClientInfo = ClientInfo#{keepalive => Interval},
|
||||||
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
|
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
|
||||||
{reply, ok, ensure_keepalive(NChannel)};
|
{reply, ok, [{event, updated}], ensure_keepalive(NChannel)};
|
||||||
|
|
||||||
handle_call({subscribe, TopicFilter, Qos},
|
handle_call({subscribe, TopicFilter, Qos},
|
||||||
Channel = #channel{
|
Channel = #channel{
|
||||||
|
@ -339,13 +340,13 @@ handle_call({subscribe, TopicFilter, Qos},
|
||||||
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
||||||
_ ->
|
_ ->
|
||||||
{ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
|
{ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
|
||||||
{reply, ok, NChannel}
|
{reply, ok, [{event, updated}], NChannel}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call({unsubscribe, TopicFilter},
|
handle_call({unsubscribe, TopicFilter},
|
||||||
Channel = #channel{conn_state = connected}) ->
|
Channel = #channel{conn_state = connected}) ->
|
||||||
{ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
|
{ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
|
||||||
{reply, ok, NChannel};
|
{reply, ok, [{event, updated}], NChannel};
|
||||||
|
|
||||||
handle_call({publish, Topic, Qos, Payload},
|
handle_call({publish, Topic, Qos, Payload},
|
||||||
Channel = #channel{
|
Channel = #channel{
|
||||||
|
|
|
@ -211,16 +211,35 @@ esockd_setopts({esockd_transport, Socket}, Opts) ->
|
||||||
%% FIXME: DTLS works??
|
%% FIXME: DTLS works??
|
||||||
esockd_transport:setopts(Socket, Opts).
|
esockd_transport:setopts(Socket, Opts).
|
||||||
|
|
||||||
esockd_getstat({udp, _SockPid, Sock}, Stats) ->
|
|
||||||
inet:getstat(Sock, Stats);
|
|
||||||
esockd_getstat({esockd_transport, Sock}, Stats) ->
|
esockd_getstat({esockd_transport, Sock}, Stats) ->
|
||||||
esockd_transport:getstat(Sock, Stats).
|
esockd_transport:getstat(Sock, Stats);
|
||||||
|
esockd_getstat({udp, _SockPid, _Sock}, Stats) ->
|
||||||
|
{ok, lists:map(fun(K) -> {K, get_stats(K)} end, Stats)}.
|
||||||
|
|
||||||
send(Data, #state{socket = {udp, _SockPid, Sock}, peername = {Ip, Port}}) ->
|
send(Data, State = #state{socket = {udp, _SockPid, Sock}, peername = {Ip, Port}}) ->
|
||||||
|
incr_send_stats(Data, State),
|
||||||
gen_udp:send(Sock, Ip, Port, Data);
|
gen_udp:send(Sock, Ip, Port, Data);
|
||||||
send(Data, #state{socket = {esockd_transport, Sock}}) ->
|
send(Data, #state{socket = {esockd_transport, Sock}}) ->
|
||||||
esockd_transport:async_send(Sock, Data).
|
esockd_transport:async_send(Sock, Data).
|
||||||
|
|
||||||
|
incr_recv_stats(Data, #state{socket = {udp, _, _}}) ->
|
||||||
|
incr_stats(recv_oct, byte_size(Data)),
|
||||||
|
incr_stats(recv_cnt, 1).
|
||||||
|
|
||||||
|
incr_send_stats(Data, #state{socket = {udp, _, _}}) ->
|
||||||
|
incr_stats(send_oct, byte_size(Data)),
|
||||||
|
incr_stats(send_cnt, 1).
|
||||||
|
|
||||||
|
incr_stats(Key, Cnt) ->
|
||||||
|
Cnt0 = get_stats(Key),
|
||||||
|
put({stats, Key}, Cnt0 + Cnt).
|
||||||
|
|
||||||
|
get_stats(Key) ->
|
||||||
|
case get({stats, Key}) of
|
||||||
|
undefined -> 0;
|
||||||
|
Cnt -> Cnt
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% callbacks
|
%% callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -386,6 +405,7 @@ handle_msg({'$gen_cast', Req}, State) ->
|
||||||
with_channel(handle_cast, [Req], State);
|
with_channel(handle_cast, [Req], State);
|
||||||
|
|
||||||
handle_msg({datagram, _SockPid, Data}, State) ->
|
handle_msg({datagram, _SockPid, Data}, State) ->
|
||||||
|
incr_recv_stats(Data, State),
|
||||||
process_incoming(Data, State);
|
process_incoming(Data, State);
|
||||||
|
|
||||||
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
|
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
|
||||||
|
@ -450,11 +470,11 @@ handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
|
||||||
emqx_cm:connection_closed(ClientId),
|
emqx_cm:connection_closed(ClientId),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
%handle_msg({event, _Other}, State = #state{channel = Channel}) ->
|
handle_msg({event, _Other}, State = #state{channel = Channel}) ->
|
||||||
% ClientId = emqx_exproto_channel:info(clientid, Channel),
|
ClientId = emqx_exproto_channel:info(clientid, Channel),
|
||||||
% emqx_cm:set_chan_info(ClientId, info(State)),
|
emqx_cm:set_chan_info(ClientId, info(State)),
|
||||||
% emqx_cm:set_chan_stats(ClientId, stats(State)),
|
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||||
% {ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_msg({timeout, TRef, TMsg}, State) ->
|
handle_msg({timeout, TRef, TMsg}, State) ->
|
||||||
handle_timeout(TRef, TMsg, State);
|
handle_timeout(TRef, TMsg, State);
|
||||||
|
|
|
@ -37,11 +37,18 @@ all() -> emqx_ct:all(?SUITE).
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
net_kernel:start(['master@127.0.0.1', longnames]),
|
net_kernel:start(['master@127.0.0.1', longnames]),
|
||||||
emqx_ct_helpers:boot_modules(all),
|
emqx_ct_helpers:boot_modules(all),
|
||||||
|
PortDiscovery = application:get_env(gen_rpc, port_discovery),
|
||||||
|
application:set_env(gen_rpc, port_discovery, stateless),
|
||||||
|
application:ensure_all_started(gen_rpc),
|
||||||
emqx_ct_helpers:start_apps([]),
|
emqx_ct_helpers:start_apps([]),
|
||||||
Config.
|
[{port_discovery, PortDiscovery} | Config].
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(Config) ->
|
||||||
emqx_ct_helpers:stop_apps([]).
|
emqx_ct_helpers:stop_apps([gen_rpc]),
|
||||||
|
case proplists:get_value(port_discovery, Config) of
|
||||||
|
{ok, OldValue} -> application:set_env(gen_rpc, port_discovery, OldValue);
|
||||||
|
_ -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
t_is_ack_required(_) ->
|
t_is_ack_required(_) ->
|
||||||
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
|
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
|
||||||
|
@ -284,7 +291,7 @@ test_two_messages(Strategy, Group) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
last_message(ExpectedPayload, Pids) ->
|
last_message(ExpectedPayload, Pids) ->
|
||||||
last_message(ExpectedPayload, Pids, 100).
|
last_message(ExpectedPayload, Pids, 6000).
|
||||||
|
|
||||||
last_message(ExpectedPayload, Pids, Timeout) ->
|
last_message(ExpectedPayload, Pids, Timeout) ->
|
||||||
receive
|
receive
|
||||||
|
@ -336,7 +343,7 @@ t_per_group_config(_) ->
|
||||||
test_two_messages(round_robin, <<"round_robin_group">>).
|
test_two_messages(round_robin, <<"round_robin_group">>).
|
||||||
|
|
||||||
t_local(_) ->
|
t_local(_) ->
|
||||||
Node = start_slave('local_shared_sub_test', 21884),
|
Node = start_slave('local_shared_sub_test19', 21884),
|
||||||
GroupConfig = #{
|
GroupConfig = #{
|
||||||
<<"local_group_fallback">> => local,
|
<<"local_group_fallback">> => local,
|
||||||
<<"local_group">> => local,
|
<<"local_group">> => local,
|
||||||
|
@ -392,7 +399,7 @@ t_local_fallback(_) ->
|
||||||
Topic = <<"local_foo/bar">>,
|
Topic = <<"local_foo/bar">>,
|
||||||
ClientId1 = <<"ClientId1">>,
|
ClientId1 = <<"ClientId1">>,
|
||||||
ClientId2 = <<"ClientId2">>,
|
ClientId2 = <<"ClientId2">>,
|
||||||
Node = start_slave('local_fallback_shared_sub_test', 11885),
|
Node = start_slave('local_fallback_shared_sub_test19', 11885),
|
||||||
|
|
||||||
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
||||||
{ok, _} = emqtt:connect(ConnPid1),
|
{ok, _} = emqtt:connect(ConnPid1),
|
||||||
|
@ -401,10 +408,10 @@ t_local_fallback(_) ->
|
||||||
|
|
||||||
emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/local_foo/bar">>, 0}),
|
emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/local_foo/bar">>, 0}),
|
||||||
|
|
||||||
emqx:publish(Message1),
|
[{share, <<"local_foo/bar">>, {ok, 1}}] = emqx:publish(Message1),
|
||||||
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
|
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
|
||||||
|
|
||||||
rpc:call(Node, emqx, publish, [Message2]),
|
[{share, <<"local_foo/bar">>, {ok, 1}}] = rpc:call(Node, emqx, publish, [Message2]),
|
||||||
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1]),
|
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1]),
|
||||||
|
|
||||||
emqtt:stop(ConnPid1),
|
emqtt:stop(ConnPid1),
|
||||||
|
@ -537,7 +544,7 @@ start_slave(Name, Port) ->
|
||||||
{erl_flags, ebin_path()}]),
|
{erl_flags, ebin_path()}]),
|
||||||
|
|
||||||
pong = net_adm:ping(Node),
|
pong = net_adm:ping(Node),
|
||||||
setup_node(Node, Port),
|
ok = setup_node(Node, Port),
|
||||||
Node.
|
Node.
|
||||||
|
|
||||||
stop_slave(Node) ->
|
stop_slave(Node) ->
|
||||||
|
@ -563,8 +570,7 @@ setup_node(Node, Port) ->
|
||||||
name => "internal",
|
name => "internal",
|
||||||
opts => [{zone,internal}],
|
opts => [{zone,internal}],
|
||||||
proto => tcp}]),
|
proto => tcp}]),
|
||||||
application:set_env(gen_rpc, port_discovery, manual),
|
application:set_env(gen_rpc, port_discovery, stateless),
|
||||||
application:set_env(gen_rpc, tcp_server_port, Port * 2),
|
|
||||||
ok;
|
ok;
|
||||||
(_) ->
|
(_) ->
|
||||||
ok
|
ok
|
||||||
|
@ -572,7 +578,13 @@ setup_node(Node, Port) ->
|
||||||
|
|
||||||
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
|
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
|
||||||
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [[emqx], EnvHandler]),
|
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [[emqx], EnvHandler]),
|
||||||
|
|
||||||
rpc:call(Node, ekka, join, [node()]),
|
rpc:call(Node, ekka, join, [node()]),
|
||||||
|
|
||||||
|
%% Sanity check. Assert that `gen_rpc' is set up correctly:
|
||||||
|
?assertEqual( Node
|
||||||
|
, gen_rpc:call(Node, erlang, node, [])
|
||||||
|
),
|
||||||
|
?assertEqual( node()
|
||||||
|
, gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []])
|
||||||
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
Loading…
Reference in New Issue