Merge branch 'main-v4.3' into auth-mnesia-default-import
This commit is contained in:
commit
729ab6f60d
|
@ -3,23 +3,27 @@
|
||||||
[
|
[
|
||||||
{"4.3.6",
|
{"4.3.6",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.6"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.6"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{"4.3.5",
|
{"4.3.5",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.5"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.5"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.4"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.4"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.3"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.3"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
|
@ -27,6 +31,7 @@
|
||||||
]},
|
]},
|
||||||
{"4.3.2",
|
{"4.3.2",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.2"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.2"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
, {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}
|
, {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
|
@ -35,6 +40,7 @@
|
||||||
]},
|
]},
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.1"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.1"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
, {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}
|
, {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}
|
||||||
|
@ -43,6 +49,7 @@
|
||||||
]},
|
]},
|
||||||
{"4.3.0",
|
{"4.3.0",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.0"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.0"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
|
@ -55,23 +62,27 @@
|
||||||
[
|
[
|
||||||
{"4.3.6",
|
{"4.3.6",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.6"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.6"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{"4.3.5",
|
{"4.3.5",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.5"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.5"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.4"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.4"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.3"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.3"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
|
@ -79,6 +90,7 @@
|
||||||
]},
|
]},
|
||||||
{"4.3.2",
|
{"4.3.2",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.2"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.2"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
, {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}
|
, {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
|
@ -87,6 +99,7 @@
|
||||||
]},
|
]},
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.1"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.1"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
, {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}
|
, {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}
|
||||||
|
@ -95,6 +108,7 @@
|
||||||
]},
|
]},
|
||||||
{"4.3.0",
|
{"4.3.0",
|
||||||
[ {update, emqx_rule_metrics, {advanced, ["4.3.0"]}}
|
[ {update, emqx_rule_metrics, {advanced, ["4.3.0"]}}
|
||||||
|
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||||
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
|
|
|
@ -37,6 +37,7 @@
|
||||||
, on_message_dropped/4
|
, on_message_dropped/4
|
||||||
, on_message_delivered/3
|
, on_message_delivered/3
|
||||||
, on_message_acked/3
|
, on_message_acked/3
|
||||||
|
, on_delivery_dropped/4
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ event_info/0
|
-export([ event_info/0
|
||||||
|
@ -53,6 +54,7 @@
|
||||||
, 'message.delivered'
|
, 'message.delivered'
|
||||||
, 'message.acked'
|
, 'message.acked'
|
||||||
, 'message.dropped'
|
, 'message.dropped'
|
||||||
|
, 'delivery.dropped'
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
@ -136,6 +138,14 @@ on_message_acked(ClientInfo, Message, Env) ->
|
||||||
fun() -> eventmsg_acked(ClientInfo, Message) end, Env),
|
fun() -> eventmsg_acked(ClientInfo, Message) end, Env),
|
||||||
{ok, Message}.
|
{ok, Message}.
|
||||||
|
|
||||||
|
on_delivery_dropped(_ClientInfo, Message = #message{flags = #{sys := true}},
|
||||||
|
_Reason, #{ignore_sys_message := true}) ->
|
||||||
|
{ok, Message};
|
||||||
|
on_delivery_dropped(ClientInfo, Message, Reason, Env) ->
|
||||||
|
may_publish_and_apply('delivery.dropped',
|
||||||
|
fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end, Env),
|
||||||
|
{ok, Message}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Event Messages
|
%% Event Messages
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -242,6 +252,32 @@ eventmsg_dropped(Message = #message{id = Id, from = ClientId, qos = QoS, flags =
|
||||||
publish_received_at => Timestamp
|
publish_received_at => Timestamp
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
eventmsg_delivery_dropped(_ClientInfo = #{
|
||||||
|
peerhost := PeerHost,
|
||||||
|
clientid := ReceiverCId,
|
||||||
|
username := ReceiverUsername
|
||||||
|
},
|
||||||
|
Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic,
|
||||||
|
headers = Headers, payload = Payload, timestamp = Timestamp},
|
||||||
|
Reason) ->
|
||||||
|
with_basic_columns('delivery.dropped',
|
||||||
|
#{id => emqx_guid:to_hexstr(Id),
|
||||||
|
reason => Reason,
|
||||||
|
from_clientid => ClientId,
|
||||||
|
from_username => emqx_message:get_header(username, Message, undefined),
|
||||||
|
clientid => ReceiverCId,
|
||||||
|
username => ReceiverUsername,
|
||||||
|
payload => Payload,
|
||||||
|
peerhost => ntoa(PeerHost),
|
||||||
|
topic => Topic,
|
||||||
|
qos => QoS,
|
||||||
|
flags => Flags,
|
||||||
|
%% the column 'headers' will be removed in the next major release
|
||||||
|
headers => printable_maps(Headers),
|
||||||
|
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||||
|
publish_received_at => Timestamp
|
||||||
|
}).
|
||||||
|
|
||||||
eventmsg_delivered(_ClientInfo = #{
|
eventmsg_delivered(_ClientInfo = #{
|
||||||
peerhost := PeerHost,
|
peerhost := PeerHost,
|
||||||
clientid := ReceiverCId,
|
clientid := ReceiverCId,
|
||||||
|
@ -333,6 +369,7 @@ event_info() ->
|
||||||
, event_info_message_deliver()
|
, event_info_message_deliver()
|
||||||
, event_info_message_acked()
|
, event_info_message_acked()
|
||||||
, event_info_message_dropped()
|
, event_info_message_dropped()
|
||||||
|
, event_info_delivery_dropped()
|
||||||
, event_info_client_connected()
|
, event_info_client_connected()
|
||||||
, event_info_client_disconnected()
|
, event_info_client_disconnected()
|
||||||
, event_info_session_subscribed()
|
, event_info_session_subscribed()
|
||||||
|
@ -363,10 +400,19 @@ event_info_message_acked() ->
|
||||||
event_info_message_dropped() ->
|
event_info_message_dropped() ->
|
||||||
event_info_common(
|
event_info_common(
|
||||||
'message.dropped',
|
'message.dropped',
|
||||||
{<<"message dropped">>, <<"消息丢弃"/utf8>>},
|
{<<"message routing-drop">>, <<"消息转发丢弃"/utf8>>},
|
||||||
{<<"message dropped">>, <<"消息丢弃"/utf8>>},
|
{<<"messages are discarded during forwarding, usually because there are no subscribers">>,
|
||||||
|
<<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>},
|
||||||
<<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">>
|
<<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">>
|
||||||
).
|
).
|
||||||
|
event_info_delivery_dropped() ->
|
||||||
|
event_info_common(
|
||||||
|
'delivery.dropped',
|
||||||
|
{<<"message delivery-drop">>, <<"消息投递丢弃"/utf8>>},
|
||||||
|
{<<"messages are discarded during delivery, i.e. because the message queue is full">>,
|
||||||
|
<<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>},
|
||||||
|
<<"SELECT * FROM \"$events/delivery_dropped\" WHERE topic =~ 't/#'">>
|
||||||
|
).
|
||||||
event_info_client_connected() ->
|
event_info_client_connected() ->
|
||||||
event_info_common(
|
event_info_common(
|
||||||
'client.connected',
|
'client.connected',
|
||||||
|
@ -406,7 +452,8 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
test_columns('message.dropped') ->
|
test_columns('message.dropped') ->
|
||||||
test_columns('message.publish');
|
[ {<<"reason">>, <<"no_subscribers">>}
|
||||||
|
] ++ test_columns('message.publish');
|
||||||
test_columns('message.publish') ->
|
test_columns('message.publish') ->
|
||||||
[ {<<"clientid">>, <<"c_emqx">>}
|
[ {<<"clientid">>, <<"c_emqx">>}
|
||||||
, {<<"username">>, <<"u_emqx">>}
|
, {<<"username">>, <<"u_emqx">>}
|
||||||
|
@ -425,6 +472,9 @@ test_columns('message.delivered') ->
|
||||||
, {<<"qos">>, 1}
|
, {<<"qos">>, 1}
|
||||||
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
||||||
];
|
];
|
||||||
|
test_columns('delivery.dropped') ->
|
||||||
|
[ {<<"reason">>, <<"queue_full">>}
|
||||||
|
] ++ test_columns('message.delivered');
|
||||||
test_columns('client.connected') ->
|
test_columns('client.connected') ->
|
||||||
[ {<<"clientid">>, <<"c_emqx">>}
|
[ {<<"clientid">>, <<"c_emqx">>}
|
||||||
, {<<"username">>, <<"u_emqx">>}
|
, {<<"username">>, <<"u_emqx">>}
|
||||||
|
@ -506,6 +556,23 @@ columns_with_exam('message.dropped') ->
|
||||||
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
||||||
, {<<"node">>, node()}
|
, {<<"node">>, node()}
|
||||||
];
|
];
|
||||||
|
columns_with_exam('delivery.dropped') ->
|
||||||
|
[ {<<"event">>, 'delivery.dropped'}
|
||||||
|
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
|
||||||
|
, {<<"reason">>, queue_full}
|
||||||
|
, {<<"from_clientid">>, <<"c_emqx_1">>}
|
||||||
|
, {<<"from_username">>, <<"u_emqx_1">>}
|
||||||
|
, {<<"clientid">>, <<"c_emqx_2">>}
|
||||||
|
, {<<"username">>, <<"u_emqx_2">>}
|
||||||
|
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
||||||
|
, {<<"peerhost">>, <<"192.168.0.10">>}
|
||||||
|
, {<<"topic">>, <<"t/a">>}
|
||||||
|
, {<<"qos">>, 1}
|
||||||
|
, {<<"flags">>, #{}}
|
||||||
|
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
|
||||||
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
||||||
|
, {<<"node">>, node()}
|
||||||
|
];
|
||||||
columns_with_exam('client.connected') ->
|
columns_with_exam('client.connected') ->
|
||||||
[ {<<"event">>, 'client.connected'}
|
[ {<<"event">>, 'client.connected'}
|
||||||
, {<<"clientid">>, <<"c_emqx">>}
|
, {<<"clientid">>, <<"c_emqx">>}
|
||||||
|
@ -594,6 +661,7 @@ event_name(<<"$events/session_unsubscribed", _/binary>>) ->
|
||||||
event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
|
event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
|
||||||
event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
|
event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
|
||||||
event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
|
event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
|
||||||
|
event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped';
|
||||||
event_name(_) -> 'message.publish'.
|
event_name(_) -> 'message.publish'.
|
||||||
|
|
||||||
event_topic('client.connected') -> <<"$events/client_connected">>;
|
event_topic('client.connected') -> <<"$events/client_connected">>;
|
||||||
|
@ -603,6 +671,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
|
||||||
event_topic('message.delivered') -> <<"$events/message_delivered">>;
|
event_topic('message.delivered') -> <<"$events/message_delivered">>;
|
||||||
event_topic('message.acked') -> <<"$events/message_acked">>;
|
event_topic('message.acked') -> <<"$events/message_acked">>;
|
||||||
event_topic('message.dropped') -> <<"$events/message_dropped">>;
|
event_topic('message.dropped') -> <<"$events/message_dropped">>;
|
||||||
|
event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
|
||||||
event_topic('message.publish') -> <<"$events/message_publish">>.
|
event_topic('message.publish') -> <<"$events/message_publish">>.
|
||||||
|
|
||||||
printable_maps(undefined) -> #{};
|
printable_maps(undefined) -> #{};
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_sn,
|
{application, emqx_sn,
|
||||||
[{description, "EMQ X MQTT-SN Plugin"},
|
[{description, "EMQ X MQTT-SN Plugin"},
|
||||||
{vsn, "4.3.4"}, % strict semver, bump manually!
|
{vsn, "4.3.5"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,esockd]},
|
{applications, [kernel,stdlib,esockd]},
|
||||||
|
|
|
@ -1,14 +1,30 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.3",[{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}]},
|
[
|
||||||
{"4.3.2",
|
{"4.3.4",[
|
||||||
[{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}]},
|
]},
|
||||||
{<<"4\\.3\\.[0-1]">>,
|
{"4.3.3",[
|
||||||
[{restart_application,emqx_sn}]}],
|
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
||||||
[{"4.3.3",[{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
{"4.3.2",
|
]},
|
||||||
[{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]},
|
{"4.3.2", [
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
||||||
{<<"4\\.3\\.[0-1]">>,
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
[{restart_application,emqx_sn}]}]}.
|
]},
|
||||||
|
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{"4.3.4",[
|
||||||
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
|
{"4.3.3",[
|
||||||
|
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
|
{"4.3.2", [
|
||||||
|
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
|
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
||||||
|
]}.
|
||||||
|
|
|
@ -408,7 +408,8 @@ asleep(cast, {incoming, ?SN_PINGREQ_MSG(ClientIdPing)},
|
||||||
inc_ping_counter(),
|
inc_ping_counter(),
|
||||||
case ClientIdPing of
|
case ClientIdPing of
|
||||||
ClientId ->
|
ClientId ->
|
||||||
case emqx_session:replay(emqx_channel:get_session(Channel)) of
|
case emqx_session:replay(emqx_channel:info(clientinfo, Channel),
|
||||||
|
emqx_channel:get_session(Channel)) of
|
||||||
{ok, [], Session0} ->
|
{ok, [], Session0} ->
|
||||||
State0 = send_message(?SN_PINGRESP_MSG(), State),
|
State0 = send_message(?SN_PINGRESP_MSG(), State),
|
||||||
{keep_state, State0#state{
|
{keep_state, State0#state{
|
||||||
|
@ -521,7 +522,8 @@ handle_event(info, {deliver, _Topic, Msg}, asleep,
|
||||||
% section 6.14, Support of sleeping clients
|
% section 6.14, Support of sleeping clients
|
||||||
?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p",
|
?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p",
|
||||||
[Msg, Pendings]),
|
[Msg, Pendings]),
|
||||||
Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)),
|
Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel),
|
||||||
|
Msg, emqx_channel:get_session(Channel)),
|
||||||
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
|
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
|
||||||
|
|
||||||
handle_event(info, Deliver = {deliver, _Topic, _Msg}, _StateName,
|
handle_event(info, Deliver = {deliver, _Topic, _Msg}, _StateName,
|
||||||
|
|
2
build
2
build
|
@ -65,7 +65,7 @@ make_relup() {
|
||||||
if [ -d "$releases_dir" ]; then
|
if [ -d "$releases_dir" ]; then
|
||||||
while read -r zip; do
|
while read -r zip; do
|
||||||
local base_vsn
|
local base_vsn
|
||||||
base_vsn="$(echo "$zip" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-[0-9a-e]{8})?")"
|
base_vsn="$(echo "$zip" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-[0-9a-f]{8})?")"
|
||||||
if [ ! -d "$releases_dir/$base_vsn" ]; then
|
if [ ! -d "$releases_dir/$base_vsn" ]; then
|
||||||
local tmp_dir
|
local tmp_dir
|
||||||
tmp_dir="$(mktemp -d -t emqx.XXXXXXX)"
|
tmp_dir="$(mktemp -d -t emqx.XXXXXXX)"
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
deprecated_function_calls,warnings_as_errors,deprecated_functions]}.
|
deprecated_function_calls,warnings_as_errors,deprecated_functions]}.
|
||||||
|
|
||||||
{dialyzer, [
|
{dialyzer, [
|
||||||
{warnings, [unmatched_returns, error_handling, race_conditions]},
|
{warnings, [unmatched_returns, error_handling]},
|
||||||
{plt_location, "."},
|
{plt_location, "."},
|
||||||
{plt_prefix, "emqx_dialyzer"},
|
{plt_prefix, "emqx_dialyzer"},
|
||||||
{plt_apps, all_apps},
|
{plt_apps, all_apps},
|
||||||
|
|
|
@ -1,22 +1,31 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.12",
|
[{"4.3.12",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[ {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
||||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]}]},
|
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
||||||
|
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
|
||||||
|
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.3.11",
|
{"4.3.11",
|
||||||
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.10",
|
{"4.3.10",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.9",
|
{"4.3.9",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -24,12 +33,15 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.8",
|
{"4.3.8",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -37,12 +49,15 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.7",
|
{"4.3.7",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -50,6 +65,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
|
@ -58,6 +75,7 @@
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.6",
|
{"4.3.6",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -65,6 +83,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
|
@ -74,6 +94,7 @@
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.5",
|
{"4.3.5",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -81,6 +102,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
|
@ -91,6 +114,7 @@
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -98,6 +122,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
|
@ -109,6 +135,7 @@
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -116,6 +143,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||||
|
@ -128,6 +157,7 @@
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.2",
|
{"4.3.2",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -139,6 +169,8 @@
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||||
|
@ -147,6 +179,7 @@
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -162,6 +195,8 @@
|
||||||
{load_module,emqx_congestion,brutal_purge,soft_purge,[]},
|
{load_module,emqx_congestion,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_node_dump,brutal_purge,soft_purge,[]},
|
{load_module,emqx_node_dump,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
|
@ -170,6 +205,7 @@
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.0",
|
{"4.3.0",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -189,6 +225,7 @@
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_node_dump,brutal_purge,soft_purge,[]},
|
{load_module,emqx_node_dump,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
@ -197,25 +234,35 @@
|
||||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.12",
|
[{"4.3.12",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[ {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
||||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]}]},
|
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
||||||
|
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
|
||||||
|
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.3.11",
|
{"4.3.11",
|
||||||
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.10",
|
{"4.3.10",
|
||||||
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.9",
|
{"4.3.9",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -223,12 +270,15 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.8",
|
{"4.3.8",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -236,12 +286,15 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.7",
|
{"4.3.7",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -249,6 +302,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
@ -257,6 +312,7 @@
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.6",
|
{"4.3.6",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -264,6 +320,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
@ -273,6 +331,7 @@
|
||||||
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.5",
|
{"4.3.5",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -280,6 +339,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
|
@ -290,6 +351,7 @@
|
||||||
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -297,6 +359,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
|
@ -308,6 +372,7 @@
|
||||||
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -315,6 +380,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
|
@ -327,6 +394,7 @@
|
||||||
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.2",
|
{"4.3.2",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -340,12 +408,15 @@
|
||||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -363,12 +434,15 @@
|
||||||
{load_module,emqx_congestion,brutal_purge,soft_purge,[]},
|
{load_module,emqx_congestion,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_node_dump,brutal_purge,soft_purge,[]},
|
{load_module,emqx_node_dump,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.0",
|
{"4.3.0",
|
||||||
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_vm,brutal_purge,soft_purge,[]},
|
||||||
|
@ -386,6 +460,7 @@
|
||||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_node_dump,brutal_purge,soft_purge,[]},
|
{load_module,emqx_node_dump,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
@ -395,5 +470,6 @@
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rpc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}]}.
|
{<<".*">>,[]}]}.
|
||||||
|
|
|
@ -375,6 +375,10 @@ normalize_message(high_cpu_usage, #{usage := Usage}) ->
|
||||||
list_to_binary(io_lib:format("~p% cpu usage", [Usage]));
|
list_to_binary(io_lib:format("~p% cpu usage", [Usage]));
|
||||||
normalize_message(too_many_processes, #{usage := Usage}) ->
|
normalize_message(too_many_processes, #{usage := Usage}) ->
|
||||||
list_to_binary(io_lib:format("~p% process usage", [Usage]));
|
list_to_binary(io_lib:format("~p% process usage", [Usage]));
|
||||||
|
normalize_message(license_quota, #{high_watermark := High}) ->
|
||||||
|
iolist_to_binary(["License: the number of connections exceeds ", High, "%"]);
|
||||||
|
normalize_message(license_expiry, #{expiry_at := ExpiryAt}) ->
|
||||||
|
iolist_to_binary(["License will be expired at ", ExpiryAt]);
|
||||||
normalize_message(partition, #{occurred := Node}) ->
|
normalize_message(partition, #{occurred := Node}) ->
|
||||||
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
|
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
|
||||||
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
|
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
|
||||||
|
|
|
@ -342,7 +342,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
|
||||||
|
|
||||||
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
|
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
|
||||||
= #channel{clientinfo = ClientInfo, session = Session}) ->
|
= #channel{clientinfo = ClientInfo, session = Session}) ->
|
||||||
case emqx_session:puback(PacketId, Session) of
|
case emqx_session:puback(ClientInfo, PacketId, Session) of
|
||||||
{ok, Msg, NSession} ->
|
{ok, Msg, NSession} ->
|
||||||
ok = after_message_acked(ClientInfo, Msg, Properties),
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
||||||
{ok, Channel#channel{session = NSession}};
|
{ok, Channel#channel{session = NSession}};
|
||||||
|
@ -387,8 +387,9 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se
|
||||||
handle_out(pubcomp, {PacketId, RC}, Channel)
|
handle_out(pubcomp, {PacketId, RC}, Channel)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{
|
||||||
case emqx_session:pubcomp(PacketId, Session) of
|
clientinfo = ClientInfo, session = Session}) ->
|
||||||
|
case emqx_session:pubcomp(ClientInfo, PacketId, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, Channel#channel{session = NSession}};
|
{ok, Channel#channel{session = NSession}};
|
||||||
{ok, Publishes, NSession} ->
|
{ok, Publishes, NSession} ->
|
||||||
|
@ -720,27 +721,33 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
|
||||||
|
|
||||||
-spec(handle_deliver(list(emqx_types:deliver()), channel())
|
-spec(handle_deliver(list(emqx_types:deliver()), channel())
|
||||||
-> {ok, channel()} | {ok, replies(), channel()}).
|
-> {ok, channel()} | {ok, replies(), channel()}).
|
||||||
handle_deliver(Delivers, Channel = #channel{takeover = true,
|
handle_deliver(Delivers, Channel = #channel{
|
||||||
|
takeover = true,
|
||||||
pendings = Pendings,
|
pendings = Pendings,
|
||||||
session = Session,
|
session = Session,
|
||||||
clientinfo = #{clientid := ClientId}}) ->
|
clientinfo = #{clientid := ClientId} = ClientInfo}) ->
|
||||||
%% NOTE: Order is important here. While the takeover is in
|
%% NOTE: Order is important here. While the takeover is in
|
||||||
%% progress, the session cannot enqueue messages, since it already
|
%% progress, the session cannot enqueue messages, since it already
|
||||||
%% passed on the queue to the new connection in the session state.
|
%% passed on the queue to the new connection in the session state.
|
||||||
NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)),
|
NPendings = lists:append(Pendings,
|
||||||
|
ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)),
|
||||||
{ok, Channel#channel{pendings = NPendings}};
|
{ok, Channel#channel{pendings = NPendings}};
|
||||||
|
|
||||||
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
|
handle_deliver(Delivers, Channel = #channel{
|
||||||
|
conn_state = disconnected,
|
||||||
takeover = false,
|
takeover = false,
|
||||||
session = Session,
|
session = Session,
|
||||||
clientinfo = #{clientid := ClientId}}) ->
|
clientinfo = #{clientid := ClientId} = ClientInfo}) ->
|
||||||
NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
|
NSession = emqx_session:enqueue(ClientInfo,
|
||||||
|
ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session), Session),
|
||||||
{ok, Channel#channel{session = NSession}};
|
{ok, Channel#channel{session = NSession}};
|
||||||
|
|
||||||
handle_deliver(Delivers, Channel = #channel{session = Session,
|
handle_deliver(Delivers, Channel = #channel{
|
||||||
|
session = Session,
|
||||||
takeover = false,
|
takeover = false,
|
||||||
clientinfo = #{clientid := ClientId}}) ->
|
clientinfo = #{clientid := ClientId} = ClientInfo}) ->
|
||||||
case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
|
case emqx_session:deliver(ClientInfo,
|
||||||
|
ignore_local(ClientInfo, Delivers, ClientId, Session), Session) of
|
||||||
{ok, Publishes, NSession} ->
|
{ok, Publishes, NSession} ->
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = Channel#channel{session = NSession},
|
||||||
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
|
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
|
||||||
|
@ -748,11 +755,12 @@ handle_deliver(Delivers, Channel = #channel{session = Session,
|
||||||
{ok, Channel#channel{session = NSession}}
|
{ok, Channel#channel{session = NSession}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ignore_local(Delivers, Subscriber, Session) ->
|
ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
|
||||||
Subs = emqx_session:info(subscriptions, Session),
|
Subs = emqx_session:info(subscriptions, Session),
|
||||||
lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
|
lists:dropwhile(fun({deliver, Topic, #message{from = Publisher} = Msg}) ->
|
||||||
case maps:find(Topic, Subs) of
|
case maps:find(Topic, Subs) of
|
||||||
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
|
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
|
||||||
|
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]),
|
||||||
ok = emqx_metrics:inc('delivery.dropped'),
|
ok = emqx_metrics:inc('delivery.dropped'),
|
||||||
ok = emqx_metrics:inc('delivery.dropped.no_local'),
|
ok = emqx_metrics:inc('delivery.dropped.no_local'),
|
||||||
true;
|
true;
|
||||||
|
@ -1025,8 +1033,8 @@ handle_timeout(_TRef, retry_delivery,
|
||||||
Channel = #channel{conn_state = disconnected}) ->
|
Channel = #channel{conn_state = disconnected}) ->
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
handle_timeout(_TRef, retry_delivery,
|
handle_timeout(_TRef, retry_delivery,
|
||||||
Channel = #channel{session = Session}) ->
|
Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
|
||||||
case emqx_session:retry(Session) of
|
case emqx_session:retry(ClientInfo, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
||||||
{ok, Publishes, Timeout, NSession} ->
|
{ok, Publishes, Timeout, NSession} ->
|
||||||
|
@ -1588,9 +1596,10 @@ maybe_resume_session(#channel{resuming = false}) ->
|
||||||
ignore;
|
ignore;
|
||||||
maybe_resume_session(#channel{session = Session,
|
maybe_resume_session(#channel{session = Session,
|
||||||
resuming = true,
|
resuming = true,
|
||||||
pendings = Pendings}) ->
|
pendings = Pendings,
|
||||||
{ok, Publishes, Session1} = emqx_session:replay(Session),
|
clientinfo = ClientInfo}) ->
|
||||||
case emqx_session:deliver(Pendings, Session1) of
|
{ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
|
||||||
|
case emqx_session:deliver(ClientInfo, Pendings, Session1) of
|
||||||
{ok, Session2} ->
|
{ok, Session2} ->
|
||||||
{ok, Publishes, Session2};
|
{ok, Publishes, Session2};
|
||||||
{ok, More, Session2} ->
|
{ok, More, Session2} ->
|
||||||
|
|
|
@ -146,7 +146,7 @@
|
||||||
%% PubSub Metrics
|
%% PubSub Metrics
|
||||||
{counter, 'messages.publish'}, % Messages Publish
|
{counter, 'messages.publish'}, % Messages Publish
|
||||||
{counter, 'messages.dropped'}, % Messages dropped due to no subscribers
|
{counter, 'messages.dropped'}, % Messages dropped due to no subscribers
|
||||||
{counter, 'messages.dropped.expired'}, % QoS2 Messages expired
|
{counter, 'messages.dropped.await_pubrel_timeout'}, % QoS2 await PUBREL timeout
|
||||||
{counter, 'messages.dropped.no_subscribers'}, % Messages dropped
|
{counter, 'messages.dropped.no_subscribers'}, % Messages dropped
|
||||||
{counter, 'messages.forward'}, % Messages forward
|
{counter, 'messages.forward'}, % Messages forward
|
||||||
{counter, 'messages.retained'}, % Messages retained
|
{counter, 'messages.retained'}, % Messages retained
|
||||||
|
@ -542,7 +542,8 @@ reserved_idx('messages.qos2.received') -> 106;
|
||||||
reserved_idx('messages.qos2.sent') -> 107;
|
reserved_idx('messages.qos2.sent') -> 107;
|
||||||
reserved_idx('messages.publish') -> 108;
|
reserved_idx('messages.publish') -> 108;
|
||||||
reserved_idx('messages.dropped') -> 109;
|
reserved_idx('messages.dropped') -> 109;
|
||||||
reserved_idx('messages.dropped.expired') -> 110;
|
reserved_idx('messages.dropped.expired') -> 110; %% To be removed in 5.0
|
||||||
|
reserved_idx('messages.dropped.await_pubrel_timeout') -> 110;
|
||||||
reserved_idx('messages.dropped.no_subscribers') -> 111;
|
reserved_idx('messages.dropped.no_subscribers') -> 111;
|
||||||
reserved_idx('messages.forward') -> 112;
|
reserved_idx('messages.forward') -> 112;
|
||||||
reserved_idx('messages.retained') -> 113;
|
reserved_idx('messages.retained') -> 113;
|
||||||
|
|
|
@ -67,22 +67,22 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ publish/3
|
-export([ publish/3
|
||||||
, puback/2
|
, puback/3
|
||||||
, pubrec/2
|
, pubrec/2
|
||||||
, pubrel/2
|
, pubrel/2
|
||||||
, pubcomp/2
|
, pubcomp/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ deliver/2
|
-export([ deliver/3
|
||||||
, enqueue/2
|
, enqueue/3
|
||||||
, dequeue/1
|
, dequeue/2
|
||||||
, retry/1
|
, retry/2
|
||||||
, terminate/3
|
, terminate/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ takeover/1
|
-export([ takeover/1
|
||||||
, resume/2
|
, resume/2
|
||||||
, replay/1
|
, replay/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([expire/2]).
|
-export([expire/2]).
|
||||||
|
@ -312,15 +312,15 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
|
||||||
%% Client -> Broker: PUBACK
|
%% Client -> Broker: PUBACK
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(puback(emqx_types:packet_id(), session())
|
-spec(puback(emqx_types:clientinfo(), emqx_types:packet_id(), session())
|
||||||
-> {ok, emqx_types:message(), session()}
|
-> {ok, emqx_types:message(), session()}
|
||||||
| {ok, emqx_types:message(), replies(), session()}
|
| {ok, emqx_types:message(), replies(), session()}
|
||||||
| {error, emqx_types:reason_code()}).
|
| {error, emqx_types:reason_code()}).
|
||||||
puback(PacketId, Session = #session{inflight = Inflight}) ->
|
puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
||||||
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
||||||
return_with(Msg, dequeue(Session#session{inflight = Inflight1}));
|
return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1}));
|
||||||
{value, {_Pubrel, _Ts}} ->
|
{value, {_Pubrel, _Ts}} ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
||||||
none ->
|
none ->
|
||||||
|
@ -369,14 +369,14 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
|
||||||
%% Client -> Broker: PUBCOMP
|
%% Client -> Broker: PUBCOMP
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(pubcomp(emqx_types:packet_id(), session())
|
-spec(pubcomp(emqx_types:clientinfo(), emqx_types:packet_id(), session())
|
||||||
-> {ok, session()} | {ok, replies(), session()}
|
-> {ok, session()} | {ok, replies(), session()}
|
||||||
| {error, emqx_types:reason_code()}).
|
| {error, emqx_types:reason_code()}).
|
||||||
pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
|
pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
{value, {pubrel, _Ts}} ->
|
{value, {pubrel, _Ts}} ->
|
||||||
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
||||||
dequeue(Session#session{inflight = Inflight1});
|
dequeue(ClientInfo, Session#session{inflight = Inflight1});
|
||||||
{value, _Other} ->
|
{value, _Other} ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
||||||
none ->
|
none ->
|
||||||
|
@ -387,25 +387,27 @@ pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
|
||||||
%% Dequeue Msgs
|
%% Dequeue Msgs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
dequeue(Session = #session{inflight = Inflight, mqueue = Q}) ->
|
dequeue(ClientInfo, Session = #session{inflight = Inflight, mqueue = Q}) ->
|
||||||
case emqx_mqueue:is_empty(Q) of
|
case emqx_mqueue:is_empty(Q) of
|
||||||
true -> {ok, Session};
|
true -> {ok, Session};
|
||||||
false ->
|
false ->
|
||||||
{Msgs, Q1} = dequeue(batch_n(Inflight), [], Q),
|
{Msgs, Q1} = dequeue(ClientInfo, batch_n(Inflight), [], Q),
|
||||||
deliver(Msgs, [], Session#session{mqueue = Q1})
|
do_deliver(ClientInfo, Msgs, [], Session#session{mqueue = Q1})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
dequeue(0, Msgs, Q) ->
|
dequeue(_ClientInfo, 0, Msgs, Q) ->
|
||||||
{lists:reverse(Msgs), Q};
|
{lists:reverse(Msgs), Q};
|
||||||
|
|
||||||
dequeue(Cnt, Msgs, Q) ->
|
dequeue(ClientInfo, Cnt, Msgs, Q) ->
|
||||||
case emqx_mqueue:out(Q) of
|
case emqx_mqueue:out(Q) of
|
||||||
{empty, _Q} -> dequeue(0, Msgs, Q);
|
{empty, _Q} -> dequeue(ClientInfo, 0, Msgs, Q);
|
||||||
{{value, Msg}, Q1} ->
|
{{value, Msg}, Q1} ->
|
||||||
case emqx_message:is_expired(Msg) of
|
case emqx_message:is_expired(Msg) of
|
||||||
true -> ok = inc_expired_cnt(delivery),
|
true ->
|
||||||
dequeue(Cnt, Msgs, Q1);
|
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
|
||||||
false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
|
ok = inc_delivery_expired_cnt(),
|
||||||
|
dequeue(ClientInfo, Cnt, Msgs, Q1);
|
||||||
|
false -> dequeue(ClientInfo, acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -417,38 +419,38 @@ acc_cnt(_Msg, Cnt) -> Cnt - 1.
|
||||||
%% Broker -> Client: Deliver
|
%% Broker -> Client: Deliver
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(deliver(list(emqx_types:deliver()), session())
|
-spec(deliver(emqx_types:clientinfo(), list(emqx_types:deliver()), session())
|
||||||
-> {ok, session()} | {ok, replies(), session()}).
|
-> {ok, session()} | {ok, replies(), session()}).
|
||||||
deliver([Deliver], Session) -> %% Optimize
|
deliver(ClientInfo, [Deliver], Session) -> %% Optimize
|
||||||
Enrich = enrich_fun(Session),
|
Msg = enrich_delivers(Deliver, Session),
|
||||||
deliver_msg(Enrich(Deliver), Session);
|
deliver_msg(ClientInfo, Msg, Session);
|
||||||
|
|
||||||
deliver(Delivers, Session) ->
|
deliver(ClientInfo, Delivers, Session) ->
|
||||||
Msgs = lists:map(enrich_fun(Session), Delivers),
|
Msgs = [enrich_delivers(D, Session) || D <- Delivers],
|
||||||
deliver(Msgs, [], Session).
|
do_deliver(ClientInfo, Msgs, [], Session).
|
||||||
|
|
||||||
deliver([], Publishes, Session) ->
|
do_deliver(_ClientInfo, [], Publishes, Session) ->
|
||||||
{ok, lists:reverse(Publishes), Session};
|
{ok, lists:reverse(Publishes), Session};
|
||||||
|
|
||||||
deliver([Msg | More], Acc, Session) ->
|
do_deliver(ClientInfo, [Msg | More], Acc, Session) ->
|
||||||
case deliver_msg(Msg, Session) of
|
case deliver_msg(ClientInfo, Msg, Session) of
|
||||||
{ok, Session1} ->
|
{ok, Session1} ->
|
||||||
deliver(More, Acc, Session1);
|
do_deliver(ClientInfo, More, Acc, Session1);
|
||||||
{ok, [Publish], Session1} ->
|
{ok, [Publish], Session1} ->
|
||||||
deliver(More, [Publish|Acc], Session1)
|
do_deliver(ClientInfo, More, [Publish|Acc], Session1)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
deliver_msg(Msg = #message{qos = ?QOS_0}, Session) ->
|
deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
|
||||||
{ok, [{undefined, maybe_ack(Msg)}], Session};
|
{ok, [{undefined, maybe_ack(Msg)}], Session};
|
||||||
|
|
||||||
deliver_msg(Msg = #message{qos = QoS}, Session =
|
deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
|
||||||
#session{next_pkt_id = PacketId, inflight = Inflight})
|
#session{next_pkt_id = PacketId, inflight = Inflight})
|
||||||
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
|
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
|
||||||
case emqx_inflight:is_full(Inflight) of
|
case emqx_inflight:is_full(Inflight) of
|
||||||
true ->
|
true ->
|
||||||
Session1 = case maybe_nack(Msg) of
|
Session1 = case maybe_nack(Msg) of
|
||||||
true -> Session;
|
true -> Session;
|
||||||
false -> enqueue(Msg, Session)
|
false -> enqueue(ClientInfo, Msg, Session)
|
||||||
end,
|
end,
|
||||||
{ok, Session1};
|
{ok, Session1};
|
||||||
false ->
|
false ->
|
||||||
|
@ -457,36 +459,36 @@ deliver_msg(Msg = #message{qos = QoS}, Session =
|
||||||
{ok, [Publish], next_pkt_id(Session1)}
|
{ok, [Publish], next_pkt_id(Session1)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(enqueue(list(emqx_types:deliver())|emqx_types:message(),
|
-spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver())|emqx_types:message(),
|
||||||
session()) -> session()).
|
session()) -> session()).
|
||||||
enqueue([Deliver], Session) -> %% Optimize
|
enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) ->
|
||||||
Enrich = enrich_fun(Session),
|
Msgs = [enrich_delivers(D, Session) || D <- Delivers],
|
||||||
enqueue(Enrich(Deliver), Session);
|
lists:foldl(fun(Msg, Session0) ->
|
||||||
|
enqueue(ClientInfo, Msg, Session0)
|
||||||
|
end, Session, Msgs);
|
||||||
|
|
||||||
enqueue(Delivers, Session) when is_list(Delivers) ->
|
enqueue(ClientInfo, #message{} = Msg, Session = #session{mqueue = Q}) ->
|
||||||
Msgs = lists:map(enrich_fun(Session), Delivers),
|
|
||||||
lists:foldl(fun enqueue/2, Session, Msgs);
|
|
||||||
|
|
||||||
enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
|
|
||||||
{Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
|
{Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
|
||||||
(Dropped =/= undefined) andalso log_dropped(Dropped, Session),
|
(Dropped =/= undefined) andalso log_dropped(ClientInfo, Dropped, Session),
|
||||||
Session#session{mqueue = NewQ}.
|
Session#session{mqueue = NewQ}.
|
||||||
|
|
||||||
log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
|
log_dropped(ClientInfo, Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
|
||||||
case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of
|
case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of
|
||||||
true ->
|
true ->
|
||||||
|
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]),
|
||||||
|
ok = emqx_metrics:inc('delivery.dropped'),
|
||||||
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
|
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
|
||||||
?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]);
|
?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]);
|
||||||
false ->
|
false ->
|
||||||
|
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]),
|
||||||
|
ok = emqx_metrics:inc('delivery.dropped'),
|
||||||
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
|
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
|
||||||
?LOG(warning, "Dropped msg due to mqueue is full: ~s",
|
?LOG(warning, "Dropped msg due to mqueue is full: ~s",
|
||||||
[emqx_message:format(Msg)])
|
[emqx_message:format(Msg)])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
enrich_fun(Session = #session{subscriptions = Subs}) ->
|
enrich_delivers({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) ->
|
||||||
fun({deliver, Topic, Msg}) ->
|
enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
|
||||||
enrich_subopts(get_subopts(Topic, Subs), Msg, Session)
|
|
||||||
end.
|
|
||||||
|
|
||||||
maybe_ack(Msg) ->
|
maybe_ack(Msg) ->
|
||||||
case emqx_shared_sub:is_ack_required(Msg) of
|
case emqx_shared_sub:is_ack_required(Msg) of
|
||||||
|
@ -541,31 +543,33 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
|
||||||
%% Retry Delivery
|
%% Retry Delivery
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
|
-spec(retry(emqx_types:clientinfo(), session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
|
||||||
retry(Session = #session{inflight = Inflight}) ->
|
retry(ClientInfo, Session = #session{inflight = Inflight}) ->
|
||||||
case emqx_inflight:is_empty(Inflight) of
|
case emqx_inflight:is_empty(Inflight) of
|
||||||
true -> {ok, Session};
|
true -> {ok, Session};
|
||||||
false -> retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
|
false -> retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
|
||||||
[], erlang:system_time(millisecond), Session)
|
[], erlang:system_time(millisecond), Session, ClientInfo)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) ->
|
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) ->
|
||||||
{ok, lists:reverse(Acc), Interval, Session};
|
{ok, lists:reverse(Acc), Interval, Session};
|
||||||
|
|
||||||
retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
|
retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
|
||||||
#session{retry_interval = Interval, inflight = Inflight}) ->
|
#session{retry_interval = Interval, inflight = Inflight}, ClientInfo) ->
|
||||||
case (Age = age(Now, Ts)) >= Interval of
|
case (Age = age(Now, Ts)) >= Interval of
|
||||||
true ->
|
true ->
|
||||||
{Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight),
|
{Acc1, Inflight1} = do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo),
|
||||||
retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1});
|
retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo);
|
||||||
false ->
|
false ->
|
||||||
{ok, lists:reverse(Acc), Interval - max(0, Age), Session}
|
{ok, lists:reverse(Acc), Interval - max(0, Age), Session}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
|
do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo)
|
||||||
|
when is_record(Msg, message) ->
|
||||||
case emqx_message:is_expired(Msg) of
|
case emqx_message:is_expired(Msg) of
|
||||||
true ->
|
true ->
|
||||||
ok = inc_expired_cnt(delivery),
|
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
|
||||||
|
ok = inc_delivery_expired_cnt(),
|
||||||
{Acc, emqx_inflight:delete(PacketId, Inflight)};
|
{Acc, emqx_inflight:delete(PacketId, Inflight)};
|
||||||
false ->
|
false ->
|
||||||
Msg1 = emqx_message:set_flag(dup, true, Msg),
|
Msg1 = emqx_message:set_flag(dup, true, Msg),
|
||||||
|
@ -573,7 +577,7 @@ retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -
|
||||||
{[{PacketId, Msg1}|Acc], Inflight1}
|
{[{PacketId, Msg1}|Acc], Inflight1}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
|
do_retry_delivery(PacketId, pubrel, Now, Acc, Inflight, _) ->
|
||||||
Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight),
|
Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight),
|
||||||
{[{pubrel, PacketId}|Acc], Inflight1}.
|
{[{pubrel, PacketId}|Acc], Inflight1}.
|
||||||
|
|
||||||
|
@ -593,7 +597,7 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
|
||||||
NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
|
NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
|
||||||
AwaitingRel1 = maps:filter(NotExpired, AwaitingRel),
|
AwaitingRel1 = maps:filter(NotExpired, AwaitingRel),
|
||||||
ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1),
|
ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1),
|
||||||
(ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt),
|
(ExpiredCnt > 0) andalso inc_await_pubrel_timeout(ExpiredCnt),
|
||||||
NSession = Session#session{awaiting_rel = AwaitingRel1},
|
NSession = Session#session{awaiting_rel = AwaitingRel1},
|
||||||
case maps:size(AwaitingRel1) of
|
case maps:size(AwaitingRel1) of
|
||||||
0 -> {ok, NSession};
|
0 -> {ok, NSession};
|
||||||
|
@ -616,14 +620,14 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
|
||||||
ok = emqx_metrics:inc('session.resumed'),
|
ok = emqx_metrics:inc('session.resumed'),
|
||||||
emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]).
|
emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]).
|
||||||
|
|
||||||
-spec(replay(session()) -> {ok, replies(), session()}).
|
-spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}).
|
||||||
replay(Session = #session{inflight = Inflight}) ->
|
replay(ClientInfo, Session = #session{inflight = Inflight}) ->
|
||||||
Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) ->
|
Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) ->
|
||||||
{pubrel, PacketId};
|
{pubrel, PacketId};
|
||||||
({PacketId, {Msg, _Ts}}) ->
|
({PacketId, {Msg, _Ts}}) ->
|
||||||
{PacketId, emqx_message:set_flag(dup, true, Msg)}
|
{PacketId, emqx_message:set_flag(dup, true, Msg)}
|
||||||
end, emqx_inflight:to_list(Inflight)),
|
end, emqx_inflight:to_list(Inflight)),
|
||||||
case dequeue(Session) of
|
case dequeue(ClientInfo, Session) of
|
||||||
{ok, NSession} -> {ok, Pubs, NSession};
|
{ok, NSession} -> {ok, Pubs, NSession};
|
||||||
{ok, More, NSession} ->
|
{ok, More, NSession} ->
|
||||||
{ok, lists:append(Pubs, More), NSession}
|
{ok, lists:append(Pubs, More), NSession}
|
||||||
|
@ -644,18 +648,16 @@ run_hook(Name, Args) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Inc message/delivery expired counter
|
%% Inc message/delivery expired counter
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
inc_delivery_expired_cnt() ->
|
||||||
|
inc_delivery_expired_cnt(1).
|
||||||
|
|
||||||
-compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}).
|
inc_delivery_expired_cnt(N) ->
|
||||||
|
|
||||||
inc_expired_cnt(K) -> inc_expired_cnt(K, 1).
|
|
||||||
|
|
||||||
inc_expired_cnt(delivery, N) ->
|
|
||||||
ok = emqx_metrics:inc('delivery.dropped', N),
|
ok = emqx_metrics:inc('delivery.dropped', N),
|
||||||
emqx_metrics:inc('delivery.dropped.expired', N);
|
emqx_metrics:inc('delivery.dropped.expired', N).
|
||||||
|
|
||||||
inc_expired_cnt(message, N) ->
|
inc_await_pubrel_timeout(N) ->
|
||||||
ok = emqx_metrics:inc('messages.dropped', N),
|
ok = emqx_metrics:inc('messages.dropped', N),
|
||||||
emqx_metrics:inc('messages.dropped.expired', N).
|
emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Next Packet Id
|
%% Next Packet Id
|
||||||
|
|
|
@ -80,15 +80,16 @@ t_chan_info(_) ->
|
||||||
?assertEqual(clientinfo(), ClientInfo).
|
?assertEqual(clientinfo(), ClientInfo).
|
||||||
|
|
||||||
t_chan_caps(_) ->
|
t_chan_caps(_) ->
|
||||||
#{max_clientid_len := 65535,
|
?assertMatch(#{
|
||||||
|
max_clientid_len := 65535,
|
||||||
max_qos_allowed := 2,
|
max_qos_allowed := 2,
|
||||||
max_topic_alias := 65535,
|
max_topic_alias := 65535,
|
||||||
max_topic_levels := 128,
|
max_topic_levels := Level,
|
||||||
retain_available := true,
|
retain_available := true,
|
||||||
shared_subscription := true,
|
shared_subscription := true,
|
||||||
subscription_identifiers := true,
|
subscription_identifiers := true,
|
||||||
wildcard_subscription := true
|
wildcard_subscription := true
|
||||||
} = emqx_channel:caps(channel()).
|
} when is_integer(Level), emqx_channel:caps(channel())).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for channel handle_in
|
%% Test cases for channel handle_in
|
||||||
|
@ -216,14 +217,14 @@ t_handle_in_qos2_publish_with_error_return(_) ->
|
||||||
t_handle_in_puback_ok(_) ->
|
t_handle_in_puback_ok(_) ->
|
||||||
Msg = emqx_message:make(<<"t">>, <<"payload">>),
|
Msg = emqx_message:make(<<"t">>, <<"payload">>),
|
||||||
ok = meck:expect(emqx_session, puback,
|
ok = meck:expect(emqx_session, puback,
|
||||||
fun(_PacketId, Session) -> {ok, Msg, Session} end),
|
fun(_, _PacketId, Session) -> {ok, Msg, Session} end),
|
||||||
Channel = channel(#{conn_state => connected}),
|
Channel = channel(#{conn_state => connected}),
|
||||||
{ok, _NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel).
|
{ok, _NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel).
|
||||||
% ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)).
|
% ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)).
|
||||||
|
|
||||||
t_handle_in_puback_id_in_use(_) ->
|
t_handle_in_puback_id_in_use(_) ->
|
||||||
ok = meck:expect(emqx_session, puback,
|
ok = meck:expect(emqx_session, puback,
|
||||||
fun(_, _Session) ->
|
fun(_, _, _Session) ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
|
||||||
end),
|
end),
|
||||||
{ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()).
|
{ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()).
|
||||||
|
@ -231,7 +232,7 @@ t_handle_in_puback_id_in_use(_) ->
|
||||||
|
|
||||||
t_handle_in_puback_id_not_found(_) ->
|
t_handle_in_puback_id_not_found(_) ->
|
||||||
ok = meck:expect(emqx_session, puback,
|
ok = meck:expect(emqx_session, puback,
|
||||||
fun(_, _Session) ->
|
fun(_, _, _Session) ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
||||||
end),
|
end),
|
||||||
{ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()).
|
{ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()).
|
||||||
|
@ -305,13 +306,13 @@ t_handle_in_pubrel_not_found_error(_) ->
|
||||||
emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()).
|
emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()).
|
||||||
|
|
||||||
t_handle_in_pubcomp_ok(_) ->
|
t_handle_in_pubcomp_ok(_) ->
|
||||||
ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end),
|
ok = meck:expect(emqx_session, pubcomp, fun(_, _, Session) -> {ok, Session} end),
|
||||||
{ok, _Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()).
|
{ok, _Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()).
|
||||||
% ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)).
|
% ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)).
|
||||||
|
|
||||||
t_handle_in_pubcomp_not_found_error(_) ->
|
t_handle_in_pubcomp_not_found_error(_) ->
|
||||||
ok = meck:expect(emqx_session, pubcomp,
|
ok = meck:expect(emqx_session, pubcomp,
|
||||||
fun(_PacketId, _Session) ->
|
fun(_, _PacketId, _Session) ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
||||||
end),
|
end),
|
||||||
Channel = channel(#{conn_state => connected}),
|
Channel = channel(#{conn_state => connected}),
|
||||||
|
@ -633,7 +634,7 @@ t_handle_timeout_keepalive(_) ->
|
||||||
|
|
||||||
t_handle_timeout_retry_delivery(_) ->
|
t_handle_timeout_retry_delivery(_) ->
|
||||||
TRef = make_ref(),
|
TRef = make_ref(),
|
||||||
ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end),
|
ok = meck:expect(emqx_session, retry, fun(_, Session) -> {ok, Session} end),
|
||||||
Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()),
|
Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()),
|
||||||
{ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, Channel).
|
{ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, Channel).
|
||||||
|
|
||||||
|
|
|
@ -165,7 +165,7 @@ t_puback(_) ->
|
||||||
Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>),
|
Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>),
|
||||||
Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
|
Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
|
||||||
Session = session(#{inflight => Inflight, mqueue => mqueue()}),
|
Session = session(#{inflight => Inflight, mqueue => mqueue()}),
|
||||||
{ok, Msg, Session1} = emqx_session:puback(1, Session),
|
{ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session),
|
||||||
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
|
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
|
||||||
|
|
||||||
t_puback_with_dequeue(_) ->
|
t_puback_with_dequeue(_) ->
|
||||||
|
@ -174,7 +174,7 @@ t_puback_with_dequeue(_) ->
|
||||||
Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>),
|
Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>),
|
||||||
{_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})),
|
{_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})),
|
||||||
Session = session(#{inflight => Inflight, mqueue => Q}),
|
Session = session(#{inflight => Inflight, mqueue => Q}),
|
||||||
{ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(1, Session),
|
{ok, Msg1, [{_, Msg3}], Session1} = emqx_session:puback(clientinfo(), 1, Session),
|
||||||
?assertEqual(1, emqx_session:info(inflight_cnt, Session1)),
|
?assertEqual(1, emqx_session:info(inflight_cnt, Session1)),
|
||||||
?assertEqual(0, emqx_session:info(mqueue_len, Session1)),
|
?assertEqual(0, emqx_session:info(mqueue_len, Session1)),
|
||||||
?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
|
?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
|
||||||
|
@ -182,10 +182,10 @@ t_puback_with_dequeue(_) ->
|
||||||
t_puback_error_packet_id_in_use(_) ->
|
t_puback_error_packet_id_in_use(_) ->
|
||||||
Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
|
Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} =
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} =
|
||||||
emqx_session:puback(1, session(#{inflight => Inflight})).
|
emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})).
|
||||||
|
|
||||||
t_puback_error_packet_id_not_found(_) ->
|
t_puback_error_packet_id_not_found(_) ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(1, session()).
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:puback(clientinfo(), 1, session()).
|
||||||
|
|
||||||
t_pubrec(_) ->
|
t_pubrec(_) ->
|
||||||
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
|
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
|
||||||
|
@ -213,17 +213,17 @@ t_pubrel_error_packetid_not_found(_) ->
|
||||||
t_pubcomp(_) ->
|
t_pubcomp(_) ->
|
||||||
Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
|
Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
|
||||||
Session = session(#{inflight => Inflight}),
|
Session = session(#{inflight => Inflight}),
|
||||||
{ok, Session1} = emqx_session:pubcomp(1, Session),
|
{ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session),
|
||||||
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
|
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
|
||||||
|
|
||||||
t_pubcomp_error_packetid_in_use(_) ->
|
t_pubcomp_error_packetid_in_use(_) ->
|
||||||
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
|
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
|
||||||
Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
|
Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()),
|
||||||
Session = session(#{inflight => Inflight}),
|
Session = session(#{inflight => Inflight}),
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(1, Session).
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubcomp(clientinfo(), 1, Session).
|
||||||
|
|
||||||
t_pubcomp_error_packetid_not_found(_) ->
|
t_pubcomp_error_packetid_not_found(_) ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(1, session()).
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubcomp(clientinfo(), 1, session()).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for deliver/retry
|
%% Test cases for deliver/retry
|
||||||
|
@ -231,14 +231,16 @@ t_pubcomp_error_packetid_not_found(_) ->
|
||||||
|
|
||||||
t_dequeue(_) ->
|
t_dequeue(_) ->
|
||||||
Q = mqueue(#{store_qos0 => true}),
|
Q = mqueue(#{store_qos0 => true}),
|
||||||
{ok, Session} = emqx_session:dequeue(session(#{mqueue => Q})),
|
{ok, Session} = emqx_session:dequeue(clientinfo(), session(#{mqueue => Q})),
|
||||||
Msgs = [emqx_message:make(clientid, ?QOS_0, <<"t0">>, <<"payload">>),
|
Msgs = [emqx_message:make(clientid, ?QOS_0, <<"t0">>, <<"payload">>),
|
||||||
emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
|
emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
|
||||||
emqx_message:make(clientid, ?QOS_2, <<"t2">>, <<"payload">>)
|
emqx_message:make(clientid, ?QOS_2, <<"t2">>, <<"payload">>)
|
||||||
],
|
],
|
||||||
Session1 = lists:foldl(fun emqx_session:enqueue/2, Session, Msgs),
|
Session1 = lists:foldl(fun(Msg, Session0) ->
|
||||||
|
emqx_session:enqueue(clientinfo(), Msg, Session0)
|
||||||
|
end, Session, Msgs),
|
||||||
{ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} =
|
{ok, [{undefined, Msg0}, {1, Msg1}, {2, Msg2}], Session2} =
|
||||||
emqx_session:dequeue(Session1),
|
emqx_session:dequeue(clientinfo(), Session1),
|
||||||
?assertEqual(0, emqx_session:info(mqueue_len, Session2)),
|
?assertEqual(0, emqx_session:info(mqueue_len, Session2)),
|
||||||
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)),
|
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)),
|
||||||
?assertEqual(<<"t0">>, emqx_message:topic(Msg0)),
|
?assertEqual(<<"t0">>, emqx_message:topic(Msg0)),
|
||||||
|
@ -253,7 +255,7 @@ t_deliver_qos0(_) ->
|
||||||
clientinfo(), <<"t1">>, subopts(), Session),
|
clientinfo(), <<"t1">>, subopts(), Session),
|
||||||
Deliveries = [delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]],
|
Deliveries = [delivery(?QOS_0, T) || T <- [<<"t0">>, <<"t1">>]],
|
||||||
{ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} =
|
{ok, [{undefined, Msg1}, {undefined, Msg2}], Session1} =
|
||||||
emqx_session:deliver(Deliveries, Session1),
|
emqx_session:deliver(clientinfo(), Deliveries, Session1),
|
||||||
?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
|
?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
|
||||||
?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
|
?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
|
||||||
|
|
||||||
|
@ -262,38 +264,38 @@ t_deliver_qos1(_) ->
|
||||||
{ok, Session} = emqx_session:subscribe(
|
{ok, Session} = emqx_session:subscribe(
|
||||||
clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()),
|
clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()),
|
||||||
Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]],
|
Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]],
|
||||||
{ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session),
|
{ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
|
||||||
?assertEqual(2, emqx_session:info(inflight_cnt, Session1)),
|
?assertEqual(2, emqx_session:info(inflight_cnt, Session1)),
|
||||||
?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
|
?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
|
||||||
?assertEqual(<<"t2">>, emqx_message:topic(Msg2)),
|
?assertEqual(<<"t2">>, emqx_message:topic(Msg2)),
|
||||||
{ok, Msg1, Session2} = emqx_session:puback(1, Session1),
|
{ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1),
|
||||||
?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
|
?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
|
||||||
{ok, Msg2, Session3} = emqx_session:puback(2, Session2),
|
{ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2),
|
||||||
?assertEqual(0, emqx_session:info(inflight_cnt, Session3)).
|
?assertEqual(0, emqx_session:info(inflight_cnt, Session3)).
|
||||||
|
|
||||||
t_deliver_qos2(_) ->
|
t_deliver_qos2(_) ->
|
||||||
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
|
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
|
||||||
Delivers = [delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)],
|
Delivers = [delivery(?QOS_2, <<"t0">>), delivery(?QOS_2, <<"t1">>)],
|
||||||
{ok, [{1, Msg1}, {2, Msg2}], Session} =
|
{ok, [{1, Msg1}, {2, Msg2}], Session} =
|
||||||
emqx_session:deliver(Delivers, session()),
|
emqx_session:deliver(clientinfo(), Delivers, session()),
|
||||||
?assertEqual(2, emqx_session:info(inflight_cnt, Session)),
|
?assertEqual(2, emqx_session:info(inflight_cnt, Session)),
|
||||||
?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
|
?assertEqual(<<"t0">>, emqx_message:topic(Msg1)),
|
||||||
?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
|
?assertEqual(<<"t1">>, emqx_message:topic(Msg2)).
|
||||||
|
|
||||||
t_deliver_one_msg(_) ->
|
t_deliver_one_msg(_) ->
|
||||||
{ok, [{1, Msg}], Session} =
|
{ok, [{1, Msg}], Session} =
|
||||||
emqx_session:deliver([delivery(?QOS_1, <<"t1">>)], session()),
|
emqx_session:deliver(clientinfo(), [delivery(?QOS_1, <<"t1">>)], session()),
|
||||||
?assertEqual(1, emqx_session:info(inflight_cnt, Session)),
|
?assertEqual(1, emqx_session:info(inflight_cnt, Session)),
|
||||||
?assertEqual(<<"t1">>, emqx_message:topic(Msg)).
|
?assertEqual(<<"t1">>, emqx_message:topic(Msg)).
|
||||||
|
|
||||||
t_deliver_when_inflight_is_full(_) ->
|
t_deliver_when_inflight_is_full(_) ->
|
||||||
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
|
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
|
||||||
Session = session(#{inflight => emqx_inflight:new(1)}),
|
Session = session(#{inflight => emqx_inflight:new(1)}),
|
||||||
{ok, Publishes, Session1} = emqx_session:deliver(Delivers, Session),
|
{ok, Publishes, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
|
||||||
?assertEqual(1, length(Publishes)),
|
?assertEqual(1, length(Publishes)),
|
||||||
?assertEqual(1, emqx_session:info(inflight_cnt, Session1)),
|
?assertEqual(1, emqx_session:info(inflight_cnt, Session1)),
|
||||||
?assertEqual(1, emqx_session:info(mqueue_len, Session1)),
|
?assertEqual(1, emqx_session:info(mqueue_len, Session1)),
|
||||||
{ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(1, Session1),
|
{ok, Msg1, [{2, Msg2}], Session2} = emqx_session:puback(clientinfo(), 1, Session1),
|
||||||
?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
|
?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
|
||||||
?assertEqual(0, emqx_session:info(mqueue_len, Session2)),
|
?assertEqual(0, emqx_session:info(mqueue_len, Session2)),
|
||||||
?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
|
?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
|
||||||
|
@ -301,18 +303,18 @@ t_deliver_when_inflight_is_full(_) ->
|
||||||
|
|
||||||
t_enqueue(_) ->
|
t_enqueue(_) ->
|
||||||
%% store_qos0 = true
|
%% store_qos0 = true
|
||||||
Session = emqx_session:enqueue([delivery(?QOS_0, <<"t0">>)], session()),
|
Session = emqx_session:enqueue(clientinfo(), [delivery(?QOS_0, <<"t0">>)], session()),
|
||||||
Session1 = emqx_session:enqueue([delivery(?QOS_1, <<"t1">>),
|
Session1 = emqx_session:enqueue(clientinfo(), [delivery(?QOS_1, <<"t1">>),
|
||||||
delivery(?QOS_2, <<"t2">>)], Session),
|
delivery(?QOS_2, <<"t2">>)], Session),
|
||||||
?assertEqual(3, emqx_session:info(mqueue_len, Session1)).
|
?assertEqual(3, emqx_session:info(mqueue_len, Session1)).
|
||||||
|
|
||||||
t_retry(_) ->
|
t_retry(_) ->
|
||||||
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
|
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
|
||||||
Session = session(#{retry_interval => 100}),
|
Session = session(#{retry_interval => 100}),
|
||||||
{ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session),
|
{ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
|
||||||
ok = timer:sleep(200),
|
ok = timer:sleep(200),
|
||||||
Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
|
Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
|
||||||
{ok, Msgs1, 100, Session2} = emqx_session:retry(Session1),
|
{ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1),
|
||||||
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
|
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -331,11 +333,11 @@ t_resume(_) ->
|
||||||
|
|
||||||
t_replay(_) ->
|
t_replay(_) ->
|
||||||
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
|
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
|
||||||
{ok, Pubs, Session1} = emqx_session:deliver(Delivers, session()),
|
{ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, session()),
|
||||||
Msg = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
|
Msg = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload">>),
|
||||||
Session2 = emqx_session:enqueue(Msg, Session1),
|
Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1),
|
||||||
Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs],
|
Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs],
|
||||||
{ok, ReplayPubs, Session3} = emqx_session:replay(Session2),
|
{ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2),
|
||||||
?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs),
|
?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs),
|
||||||
?assertEqual(3, emqx_session:info(inflight_cnt, Session3)).
|
?assertEqual(3, emqx_session:info(inflight_cnt, Session3)).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue