diff --git a/.gitignore b/.gitignore
index 28178b1f0..eda7d5652 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,7 +30,6 @@ compile_commands.json
cuttlefish
xrefr
*.coverdata
-etc/emqx.conf.rendered
Mnesia.*/
*.DS_Store
_checkouts
@@ -63,3 +62,5 @@ erlang_ls.config
# elixir
mix.lock
apps/emqx/test/emqx_static_checks_data/
+# rendered configurations
+*.conf.rendered
diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl
index 2c0a74fe9..0ea190254 100644
--- a/apps/emqx/src/emqx_channel.erl
+++ b/apps/emqx/src/emqx_channel.erl
@@ -119,7 +119,33 @@
quota_timer => expire_quota_limit
}).
--define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
+-define(CHANNEL_METRICS,
+ [ recv_pkt
+ , recv_msg
+ , 'recv_msg.qos0'
+ , 'recv_msg.qos1'
+ , 'recv_msg.qos2'
+ , 'recv_msg.dropped'
+ , 'recv_msg.dropped.await_pubrel_timeout'
+ , send_pkt
+ , send_msg
+ , 'send_msg.qos0'
+ , 'send_msg.qos1'
+ , 'send_msg.qos2'
+ , 'send_msg.dropped'
+ , 'send_msg.dropped.expired'
+ , 'send_msg.dropped.queue_full'
+ , 'send_msg.dropped.too_large'
+ ]).
+
+-define(INFO_KEYS,
+ [ conninfo
+ , conn_state
+ , clientinfo
+ , session
+ , will_msg
+ ]).
+
-define(LIMITER_ROUTING, message_routing).
-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
@@ -184,10 +210,9 @@ set_session(Session, Channel = #channel{conninfo = ConnInfo, clientinfo = Client
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
Channel#channel{session = Session1}.
-%% TODO: Add more stats.
-spec(stats(channel()) -> emqx_types:stats()).
stats(#channel{session = Session})->
- emqx_session:stats(Session).
+ lists:append(emqx_session:stats(Session), emqx_pd:get_counters(?CHANNEL_METRICS)).
-spec(caps(channel()) -> emqx_types:caps()).
caps(#channel{clientinfo = #{zone := Zone}}) ->
@@ -1437,7 +1462,7 @@ process_alias(Packet = #mqtt_packet{
{ok, Topic} ->
NPublish = Publish#mqtt_packet_publish{topic_name = Topic},
{ok, Packet#mqtt_packet{variable = NPublish}, Channel};
- false -> {error, ?RC_PROTOCOL_ERROR}
+ error -> {error, ?RC_PROTOCOL_ERROR}
end;
process_alias(#mqtt_packet{
@@ -1778,7 +1803,7 @@ run_hooks(Name, Args, Acc) ->
-compile({inline, [find_alias/3, save_alias/4]}).
-find_alias(_, _, undefined) -> false;
+find_alias(_, _, undefined) -> error;
find_alias(inbound, AliasId, _TopicAliases = #{inbound := Aliases}) ->
maps:find(AliasId, Aliases);
find_alias(outbound, Topic, _TopicAliases = #{outbound := Aliases}) ->
diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl
index 43566d1df..b554233a7 100644
--- a/apps/emqx/src/emqx_connection.erl
+++ b/apps/emqx/src/emqx_connection.erl
@@ -131,25 +131,6 @@
, sockstate
]).
--define(CONN_STATS,
- [ recv_pkt
- , recv_msg
- , 'recv_msg.qos0'
- , 'recv_msg.qos1'
- , 'recv_msg.qos2'
- , 'recv_msg.dropped'
- , 'recv_msg.dropped.await_pubrel_timeout'
- , send_pkt
- , send_msg
- , 'send_msg.qos0'
- , 'send_msg.qos1'
- , 'send_msg.qos2'
- , 'send_msg.dropped'
- , 'send_msg.dropped.expired'
- , 'send_msg.dropped.queue_full'
- , 'send_msg.dropped.too_large'
- ]).
-
-define(SOCK_STATS,
[ recv_oct
, recv_cnt
@@ -236,10 +217,9 @@ stats(#state{transport = Transport,
{ok, Ss} -> Ss;
{error, _} -> []
end,
- ConnStats = emqx_pd:get_counters(?CONN_STATS),
ChanStats = emqx_channel:stats(Channel),
ProcStats = emqx_misc:proc_stats(),
- lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
+ lists:append([SockStats, ChanStats, ProcStats]).
%% @doc Set TCP keepalive socket options to override system defaults.
%% Idle: The number of seconds a connection needs to be idle before
@@ -1030,12 +1010,12 @@ inc_outgoing_stats({error, message_too_large}) ->
inc_counter('send_msg.dropped.too_large', 1);
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
inc_counter(send_pkt, 1),
- case Type =:= ?PUBLISH of
- true ->
+ case Type of
+ ?PUBLISH ->
inc_counter(send_msg, 1),
inc_counter(outgoing_pubs, 1),
inc_qos_stats(send_msg, Packet);
- false ->
+ _ ->
ok
end,
emqx_metrics:inc_sent(Packet).
diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl
index 7a4dba368..cd5b7c515 100644
--- a/apps/emqx/src/emqx_ws_connection.erl
+++ b/apps/emqx/src/emqx_ws_connection.erl
@@ -112,7 +112,6 @@
-define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
--define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(ENABLED(X), (X =/= undefined)).
-define(LIMITER_BYTES_IN, bytes_in).
@@ -163,10 +162,9 @@ stats(WsPid) when is_pid(WsPid) ->
call(WsPid, stats);
stats(#state{channel = Channel}) ->
SockStats = emqx_pd:get_counters(?SOCK_STATS),
- ConnStats = emqx_pd:get_counters(?CONN_STATS),
ChanStats = emqx_channel:stats(Channel),
ProcStats = emqx_misc:proc_stats(),
- lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
+ lists:append([SockStats, ChanStats, ProcStats]).
%% kick|discard|takeover
-spec(call(pid(), Req :: term()) -> Reply :: term()).
@@ -725,6 +723,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
packet => emqx_packet:format(Packet)}),
ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'),
+ ok = inc_outgoing_stats({error, message_too_large}),
<<>>;
Data -> ?TRACE("WS-MQTT", "mqtt_packet_sent", #{packet => Packet}),
ok = inc_outgoing_stats(Packet),
@@ -762,19 +761,28 @@ inc_recv_stats(Cnt, Oct) ->
inc_incoming_stats(Packet = ?PACKET(Type)) ->
_ = emqx_pd:inc_counter(recv_pkt, 1),
- if Type == ?PUBLISH ->
- inc_counter(recv_msg, 1),
- inc_counter(incoming_pubs, 1);
- true -> ok
+ case Type of
+ ?PUBLISH ->
+ inc_counter(recv_msg, 1),
+ inc_qos_stats(recv_msg, Packet),
+ inc_counter(incoming_pubs, 1);
+ _ ->
+ ok
end,
emqx_metrics:inc_recv(Packet).
+inc_outgoing_stats({error, message_too_large}) ->
+ inc_counter('send_msg.dropped', 1),
+ inc_counter('send_msg.dropped.too_large', 1);
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
- _ = emqx_pd:inc_counter(send_pkt, 1),
- if Type == ?PUBLISH ->
- inc_counter(send_msg, 1),
- inc_counter(outgoing_pubs, 1);
- true -> ok
+ inc_counter(send_pkt, 1),
+ case Type of
+ ?PUBLISH ->
+ inc_counter(send_msg, 1),
+ inc_counter(outgoing_pubs, 1),
+ inc_qos_stats(send_msg, Packet);
+ _ ->
+ ok
end,
emqx_metrics:inc_sent(Packet).
@@ -787,6 +795,25 @@ inc_sent_stats(Cnt, Oct) ->
inc_counter(Name, Value) ->
_ = emqx_pd:inc_counter(Name, Value),
ok.
+
+inc_qos_stats(Type, Packet) ->
+ case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of
+ undefined ->
+ ignore;
+ Key ->
+ inc_counter(Key, 1)
+ end.
+
+inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0';
+inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1';
+inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2';
+
+inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0';
+inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1';
+inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2';
+%% for bad qos
+inc_qos_stats_key(_, _) -> undefined.
+
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl
index a0f3a935b..d97b0197b 100644
--- a/apps/emqx/test/emqx_channel_SUITE.erl
+++ b/apps/emqx/test/emqx_channel_SUITE.erl
@@ -883,6 +883,13 @@ t_process_alias(_) ->
{ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan} =
emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel).
+t_process_alias_inexistent_alias(_) ->
+ Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
+ Channel = channel(),
+ ?assertEqual(
+ {error, ?RC_PROTOCOL_ERROR},
+ emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel)).
+
t_packing_alias(_) ->
Packet1 = #mqtt_packet{variable = #mqtt_packet_publish{
topic_name = <<"x">>,
@@ -919,6 +926,20 @@ t_packing_alias(_) ->
#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}},
channel())).
+t_packing_alias_inexistent_alias(_) ->
+ Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
+ Channel = channel(),
+ Packet = #mqtt_packet{variable = Publish},
+ ExpectedChannel = emqx_channel:set_field(
+ topic_aliases,
+ #{ inbound => #{}
+ , outbound => #{<<>> => 1}
+ },
+ Channel),
+ ?assertEqual(
+ {Packet, ExpectedChannel},
+ emqx_channel:packing_alias(Packet, Channel)).
+
t_check_pub_authz(_) ->
emqx_config:put_zone_conf(default, [authorization, enable], true),
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl
index 0fd7ea94b..ac4d1ba03 100644
--- a/apps/emqx/test/emqx_common_test_helpers.erl
+++ b/apps/emqx/test/emqx_common_test_helpers.erl
@@ -49,6 +49,8 @@
, render_config_file/2
, read_schema_configs/2
, load_config/2
+ , is_tcp_server_available/2
+ , is_tcp_server_available/3
]).
-define( CERTS_PATH(CertName), filename:join( [ "etc", "certs", CertName ]) ).
@@ -111,6 +113,7 @@
] }
]).
+-define(DEFAULT_TCP_SERVER_CHECK_AVAIL_TIMEOUT, 1000).
%%------------------------------------------------------------------------------
%% APIs
@@ -433,3 +436,19 @@ copy_certs(_, _) -> ok.
load_config(SchemaModule, Config) ->
ok = emqx_config:delete_override_conf_files(),
ok = emqx_config:init_load(SchemaModule, Config).
+
+-spec is_tcp_server_available(Host :: inet:socket_address() | inet:hostname(),
+ Port :: inet:port_number()) -> boolean.
+is_tcp_server_available(Host, Port) ->
+ is_tcp_server_available(Host, Port, ?DEFAULT_TCP_SERVER_CHECK_AVAIL_TIMEOUT).
+
+-spec is_tcp_server_available(Host :: inet:socket_address() | inet:hostname(),
+ Port :: inet:port_number(), Timeout :: integer()) -> boolean.
+is_tcp_server_available(Host, Port, Timeout) ->
+ case gen_tcp:connect(Host, Port, [], Timeout) of
+ {ok, Socket} ->
+ gen_tcp:close(Socket),
+ true;
+ {error, _} ->
+ false
+ end.
diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl
index 6ac548459..a2f5c3502 100644
--- a/apps/emqx/test/emqx_ws_connection_SUITE.erl
+++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl
@@ -178,8 +178,9 @@ t_stats(_) ->
end
end),
Stats = ?ws_conn:call(WsPid, stats),
- [{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0},
- {recv_pkt, 0}, {recv_msg, 0}, {send_pkt, 0}, {send_msg, 0}|_] = Stats.
+ [?assert(lists:member(V, Stats)) || V <-
+ [{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0},
+ {recv_pkt, 0}, {recv_msg, 0}, {send_pkt, 0}, {send_msg, 0}]].
t_call(_) ->
Info = ?ws_conn:info(st()),
diff --git a/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl
index cb97e3d48..4e69c6fcc 100644
--- a/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl
@@ -47,7 +47,7 @@ end_per_testcase(_TestCase, _Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_authn_test_lib:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
diff --git a/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl
index 73cdd6a53..0531ce249 100644
--- a/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl
@@ -43,7 +43,7 @@ init_per_testcase(_TestCase, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_authn_test_lib:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl
index bd5ab3126..489a602c0 100644
--- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl
@@ -53,7 +53,7 @@ end_per_group(require_seeds, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_authn_test_lib:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
diff --git a/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl
index bc38b8910..d20b7b50d 100644
--- a/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl
@@ -44,7 +44,7 @@ init_per_testcase(_, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_authn_test_lib:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl
index 714c9715e..6a5e07939 100644
--- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl
@@ -54,7 +54,7 @@ end_per_group(require_seeds, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_authn_test_lib:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl
index 0225ed06f..b2ec46fd5 100644
--- a/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl
@@ -44,7 +44,7 @@ init_per_testcase(_, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_authn_test_lib:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
diff --git a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl
index 5a376b9c7..64805ecb7 100644
--- a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl
@@ -53,7 +53,7 @@ end_per_group(require_seeds, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_authn_test_lib:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
diff --git a/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl
index ba7f87272..84f67937b 100644
--- a/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl
+++ b/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl
@@ -44,7 +44,7 @@ init_per_testcase(_, Config) ->
init_per_suite(Config) ->
_ = application:load(emqx_conf),
- case emqx_authn_test_lib:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
diff --git a/apps/emqx_authn/test/emqx_authn_test_lib.erl b/apps/emqx_authn/test/emqx_authn_test_lib.erl
index 2c3437281..357d8b05b 100644
--- a/apps/emqx_authn/test/emqx_authn_test_lib.erl
+++ b/apps/emqx_authn/test/emqx_authn_test_lib.erl
@@ -21,8 +21,6 @@
-compile(nowarn_export_all).
-compile(export_all).
--define(DEFAULT_CHECK_AVAIL_TIMEOUT, 1000).
-
authenticator_example(Id) ->
#{Id := #{value := Example}} = emqx_authn_api:authenticator_examples(),
Example.
@@ -57,15 +55,6 @@ delete_config(ID) ->
{delete_authenticator, ?GLOBAL, ID},
#{rawconf_with_defaults => false}).
-is_tcp_server_available(Host, Port) ->
- case gen_tcp:connect(Host, Port, [], ?DEFAULT_CHECK_AVAIL_TIMEOUT) of
- {ok, Socket} ->
- gen_tcp:close(Socket),
- true;
- {error, _} ->
- false
- end.
-
client_ssl_cert_opts() ->
Dir = code:lib_dir(emqx_authn, test),
#{keyfile => filename:join([Dir, "data/certs", "client.key"]),
diff --git a/apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl
index 62f9f932e..4b990f125 100644
--- a/apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl
@@ -34,7 +34,7 @@ groups() ->
[].
init_per_suite(Config) ->
- case emqx_authz_test_lib:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
diff --git a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl
index d1b8d0934..0cccd748e 100644
--- a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl
@@ -34,7 +34,7 @@ groups() ->
[].
init_per_suite(Config) ->
- case emqx_authn_test_lib:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
diff --git a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl
index 45745326d..fbc2cc922 100644
--- a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl
@@ -34,7 +34,7 @@ groups() ->
[].
init_per_suite(Config) ->
- case emqx_authn_test_lib:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl
index cf784b1ab..83699f51c 100644
--- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl
@@ -35,7 +35,7 @@ groups() ->
[].
init_per_suite(Config) ->
- case emqx_authn_test_lib:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
+ case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
diff --git a/apps/emqx_authz/test/emqx_authz_test_lib.erl b/apps/emqx_authz/test/emqx_authz_test_lib.erl
index b7de5c3da..6225d3a83 100644
--- a/apps/emqx_authz/test/emqx_authz_test_lib.erl
+++ b/apps/emqx_authz/test/emqx_authz_test_lib.erl
@@ -45,15 +45,6 @@ setup_config(BaseConfig, SpecialParams) ->
{error, Reason} -> {error, Reason}
end.
-is_tcp_server_available(Host, Port) ->
- case gen_tcp:connect(Host, Port, [], ?DEFAULT_CHECK_AVAIL_TIMEOUT) of
- {ok, Socket} ->
- gen_tcp:close(Socket),
- true;
- {error, _} ->
- false
- end.
-
test_samples(ClientInfo, Samples) ->
lists:foreach(
fun({Expected, Action, Topic}) ->
diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl
index de438487e..7883193cf 100644
--- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl
+++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl
@@ -18,7 +18,10 @@
-behaviour(minirest_api).
--export([api_spec/0]).
+-export([ api_spec/0
+ , paths/0
+ , schema/1
+ ]).
-export([auto_subscribe/2]).
@@ -29,54 +32,30 @@
-include_lib("emqx/include/emqx_placeholder.hrl").
api_spec() ->
- {[auto_subscribe_api()], []}.
+ emqx_dashboard_swagger:spec(?MODULE).
-schema() ->
- case emqx_mgmt_api_configs:gen_schema(emqx:get_config([auto_subscribe, topics])) of
- #{example := <<>>, type := string} ->
- emqx_mgmt_util:schema(auto_subscribe_conf_example());
- Example ->
- emqx_mgmt_util:schema(Example)
- end.
+paths() ->
+ ["/mqtt/auto_subscribe"].
-auto_subscribe_conf_example() ->
+schema("/mqtt/auto_subscribe") ->
#{
- type => array,
- items => #{
- type => object,
- properties =>#{
- topic => #{
- type => string,
- example => <<
- "/clientid/", ?PH_S_CLIENTID,
- "/username/", ?PH_S_USERNAME,
- "/host/", ?PH_S_HOST,
- "/port/", ?PH_S_PORT>>},
- qos => #{example => 0, type => number, enum => [0, 1, 2]},
- rh => #{example => 0, type => number, enum => [0, 1, 2]},
- nl => #{example => 0, type => number, enum => [0, 1]},
- rap => #{example => 0, type => number, enum => [0, 1]}
- }
- }
- }.
-
-auto_subscribe_api() ->
- Metadata = #{
+ 'operationId' => auto_subscribe,
get => #{
description => <<"Auto subscribe list">>,
responses => #{
- <<"200">> => schema()}},
+ 200 => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe")
+ }
+ },
put => #{
description => <<"Update auto subscribe topic list">>,
- 'requestBody' => schema(),
+ 'requestBody' => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe"),
responses => #{
- <<"200">> => schema(),
- <<"400">> => emqx_mgmt_util:error_schema(
+ 200 => hoconsc:ref(emqx_auto_subscribe_schema, "auto_subscribe"),
+ 400 => emqx_mgmt_util:error_schema(
<<"Request body required">>, [?BAD_REQUEST]),
- <<"409">> => emqx_mgmt_util:error_schema(
+ 409 => emqx_mgmt_util:error_schema(
<<"Auto Subscribe topics max limit">>, [?EXCEED_LIMIT])}}
- },
- {"/mqtt/auto_subscribe", Metadata, auto_subscribe}.
+ }.
%%%==============================================================================================
%% api apply
diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_schema.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_schema.erl
index ac3100ed4..917030266 100644
--- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_schema.erl
+++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_schema.erl
@@ -18,6 +18,7 @@
-behaviour(hocon_schema).
-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/emqx_placeholder.hrl").
-export([ namespace/0
, roots/0
@@ -33,15 +34,19 @@ fields("auto_subscribe") ->
];
fields("topic") ->
- [ {topic, sc(binary(), #{})}
- , {qos, sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2)]),
- #{default => 0})}
- , {rh, sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2)]),
- #{default => 0})}
- , {rap, sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1)]), #{default => 0})}
- , {nl, sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1)]), #{default => 0})}
+ [ {topic, sc(binary(), #{example => topic_example()})}
+ , {qos, sc(emqx_schema:qos(), #{default => 0})}
+ , {rh, sc(range(0,2), #{default => 0})}
+ , {rap, sc(range(0, 1), #{default => 0})}
+ , {nl, sc(range(0, 1), #{default => 0})}
].
+topic_example() ->
+ <<"/clientid/", ?PH_S_CLIENTID,
+ "/username/", ?PH_S_USERNAME,
+ "/host/", ?PH_S_HOST,
+ "/port/", ?PH_S_PORT>>.
+
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl
index 48733b5e9..f3cb05d4f 100644
--- a/apps/emqx_connector/src/emqx_connector_ldap.erl
+++ b/apps/emqx_connector/src/emqx_connector_ldap.erl
@@ -28,7 +28,6 @@
, on_stop/2
, on_query/4
, on_health_check/2
- , on_jsonify/1
]).
-export([do_health_check/1]).
@@ -43,9 +42,6 @@ roots() ->
%% this schema has no sub-structs
fields(_) -> [].
-on_jsonify(Config) ->
- Config.
-
%% ===================================================================
on_start(InstId, #{servers := Servers0,
port := Port,
diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl
index 8926587cb..b8b10aca0 100644
--- a/apps/emqx_connector/src/emqx_connector_mongo.erl
+++ b/apps/emqx_connector/src/emqx_connector_mongo.erl
@@ -31,7 +31,6 @@
, on_stop/2
, on_query/4
, on_health_check/2
- , on_jsonify/1
]).
%% ecpool callback
@@ -97,9 +96,6 @@ mongo_fields() ->
] ++
emqx_connector_schema_lib:ssl_fields().
-on_jsonify(Config) ->
- Config.
-
%% ===================================================================
on_start(InstId, Config = #{mongo_type := Type,
diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl
index 548721a93..1af0cc34a 100644
--- a/apps/emqx_connector/src/emqx_connector_mysql.erl
+++ b/apps/emqx_connector/src/emqx_connector_mysql.erl
@@ -25,7 +25,6 @@
, on_stop/2
, on_query/4
, on_health_check/2
- , on_jsonify/1
]).
-export([connect/1]).
@@ -43,11 +42,6 @@ fields(config) ->
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().
-%%=====================================================================
-
-on_jsonify(#{server := Server}= Config) ->
- Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}.
-
%% ===================================================================
on_start(InstId, #{server := {Host, Port},
database := DB,
diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl
index 7dcaf7373..9468257ca 100644
--- a/apps/emqx_connector/src/emqx_connector_pgsql.erl
+++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl
@@ -28,7 +28,6 @@
, on_stop/2
, on_query/4
, on_health_check/2
- , on_jsonify/1
]).
-export([connect/1]).
@@ -49,9 +48,6 @@ fields(config) ->
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:ssl_fields().
-on_jsonify(#{server := Server}= Config) ->
- Config#{server => emqx_connector_schema_lib:ip_port_to_string(Server)}.
-
named_queries(type) -> map();
named_queries(nullable) -> true;
named_queries(_) -> undefined.
diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl
index 3907e24f9..fbec547f7 100644
--- a/apps/emqx_connector/src/emqx_connector_redis.erl
+++ b/apps/emqx_connector/src/emqx_connector_redis.erl
@@ -43,7 +43,6 @@
, on_stop/2
, on_query/4
, on_health_check/2
- , on_jsonify/1
]).
-export([do_health_check/1]).
@@ -85,9 +84,6 @@ fields(sentinel) ->
redis_fields() ++
emqx_connector_schema_lib:ssl_fields().
-on_jsonify(Config) ->
- Config.
-
%% ===================================================================
on_start(InstId, #{redis_type := Type,
database := Database,
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
index 66d005e7d..3c75e50ec 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
@@ -236,7 +236,7 @@ Template with variables is allowed."""
].
qos() ->
- hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2), binary()]).
+ hoconsc:union([emqx_schema:qos(), binary()]).
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).
diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
index 4e73384ec..9e5e44273 100644
--- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
+++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
@@ -21,6 +21,7 @@
-include("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
+-include("emqx_dashboard/include/emqx_dashboard.hrl").
%% output functions
-export([ inspect/3
diff --git a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl
new file mode 100644
index 000000000..8f04bd71a
--- /dev/null
+++ b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl
@@ -0,0 +1,143 @@
+% %%--------------------------------------------------------------------
+% %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+% %%
+% %% Licensed under the Apache License, Version 2.0 (the "License");
+% %% you may not use this file except in compliance with the License.
+% %% You may obtain a copy of the License at
+% %% http://www.apache.org/licenses/LICENSE-2.0
+% %%
+% %% Unless required by applicable law or agreed to in writing, software
+% %% distributed under the License is distributed on an "AS IS" BASIS,
+% %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% %% See the License for the specific language governing permissions and
+% %% limitations under the License.
+% %%--------------------------------------------------------------------
+
+-module(emqx_connector_pgsql_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+-define(PGSQL_HOST, "pgsql").
+-define(PGSQL_PORT, 5432).
+
+all() ->
+ emqx_common_test_helpers:all(?MODULE).
+
+groups() ->
+ [].
+
+init_per_suite(Config) ->
+ case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
+ true ->
+ Config;
+ false ->
+ {skip, no_pgsql}
+ end.
+
+end_per_suite(_Config) ->
+ ok.
+
+init_per_testcase(_, Config) ->
+ ?assertEqual(
+ {ok, #{poolname => emqx_connector_pgsql}},
+ emqx_connector_pgsql:on_start(<<"emqx_connector_pgsql">>, pgsql_config())
+ ),
+ Config.
+
+end_per_testcase(_, _Config) ->
+ ?assertEqual(
+ ok,
+ emqx_connector_pgsql:on_stop(<<"emqx_connector_pgsql">>, #{poolname => emqx_connector_pgsql})
+ ).
+
+% %%------------------------------------------------------------------------------
+% %% Testcases
+% %%------------------------------------------------------------------------------
+
+% Simple test to make sure the proper reference to the module is returned.
+t_roots(_Config) ->
+ ExpectedRoots = [{config, #{type => {ref, emqx_connector_pgsql, config}}}],
+ ActualRoots = emqx_connector_pgsql:roots(),
+ ?assertEqual(ExpectedRoots, ActualRoots).
+
+% Not sure if this level of testing is appropriate for this function.
+% Checking the actual values/types of the returned term starts getting
+% into checking the emqx_connector_schema_lib.erl returns and the shape
+% of expected data elsewhere.
+t_fields(_Config) ->
+ Fields = emqx_connector_pgsql:fields(config),
+ lists:foreach(
+ fun({FieldName, FieldValue}) ->
+ ?assert(is_atom(FieldName)),
+ if
+ is_map(FieldValue) ->
+ ?assert(maps:is_key(type, FieldValue) and maps:is_key(default, FieldValue));
+ true ->
+ ?assert(is_function(FieldValue))
+ end
+ end,
+ Fields
+ ).
+
+% Execute a minimal query to validate connection.
+t_basic_query(_Config) ->
+ ?assertMatch(
+ {ok, _, [{1}]},
+ emqx_connector_pgsql:on_query(
+ <<"emqx_connector_pgsql">>, {query, test_query()}, undefined, #{
+ poolname => emqx_connector_pgsql
+ }
+ )
+ ).
+
+% Perform health check.
+t_do_healthcheck(_Config) ->
+ ?assertEqual(
+ {ok, #{poolname => emqx_connector_pgsql}},
+ emqx_connector_pgsql:on_health_check(<<"emqx_connector_pgsql">>, #{
+ poolname => emqx_connector_pgsql
+ })
+ ).
+
+% Perform healthcheck on a connector that does not exist.
+t_healthceck_when_connector_does_not_exist(_Config) ->
+ ?assertEqual(
+ {error, health_check_failed, #{poolname => emqx_connector_pgsql_does_not_exist}},
+ emqx_connector_pgsql:on_health_check(<<"emqx_connector_pgsql_does_not_exist">>, #{
+ poolname => emqx_connector_pgsql_does_not_exist
+ })
+ ).
+
+% %%------------------------------------------------------------------------------
+% %% Helpers
+% %%------------------------------------------------------------------------------
+
+pgsql_config() ->
+ #{
+ auto_reconnect => true,
+ database => <<"mqtt">>,
+ username => <<"root">>,
+ password => <<"public">>,
+ pool_size => 8,
+ server => {?PGSQL_HOST, ?PGSQL_PORT},
+ ssl => #{enable => false}
+ }.
+
+pgsql_bad_config() ->
+ #{
+ auto_reconnect => true,
+ database => <<"bad_mqtt">>,
+ username => <<"bad_root">>,
+ password => <<"bad_public">>,
+ pool_size => 8,
+ server => {?PGSQL_HOST, ?PGSQL_PORT},
+ ssl => #{enable => false}
+ }.
+
+test_query() ->
+ <<"SELECT 1">>.
diff --git a/apps/emqx_exhook/src/emqx_exhook_schema.erl b/apps/emqx_exhook/src/emqx_exhook_schema.erl
index a8666798c..fe1122274 100644
--- a/apps/emqx_exhook/src/emqx_exhook_schema.erl
+++ b/apps/emqx_exhook/src/emqx_exhook_schema.erl
@@ -73,6 +73,9 @@ fields(ssl_conf) ->
, {keyfile,
sc(binary(),
#{example => <<"{{ platform_etc_dir }}/certs/key.pem">>})}
+ , {verify,
+ sc(hoconsc:enum([verify_peer, verify_none]),
+ #{example => <<"verify_none">>})}
].
%% types
diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl
index ecba2b1f2..be610c3bb 100644
--- a/apps/emqx_gateway/src/emqx_gateway_schema.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl
@@ -189,7 +189,7 @@ The type of delivered coap message can be set to:
3. qos: Mapping from QoS type of received message, QoS0 -> non, QoS1,2 -> con"
})}
, {subscribe_qos,
- sc(hoconsc:union([qos0, qos1, qos2, coap]),
+ sc(hoconsc:enum([qos0, qos1, qos2, coap]),
#{ default => coap
, desc =>
"The Default QoS Level indicator for subscribe request.
@@ -202,7 +202,7 @@ The indicator can be set to:
* qos1: If the subscribe request is confirmable"
})}
, {publish_qos,
- sc(hoconsc:union([qos0, qos1, qos2, coap]),
+ sc(hoconsc:enum([qos0, qos1, qos2, coap]),
#{ default => coap
, desc =>
"The Default QoS Level indicator for publish request.
@@ -356,7 +356,7 @@ notifyevents via this topic, if the client reports any resource changes"
fields(translator) ->
[ {topic, sc(binary(), #{nullable => false})}
- , {qos, sc(range(0, 2), #{default => 0})}
+ , {qos, sc(emqx_schema:qos(), #{default => 0})}
];
fields(udp_listeners) ->
diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl
index a2ffa1988..314b2d884 100644
--- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl
+++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl
@@ -136,6 +136,8 @@ parse(<<>>, Parser) ->
parse(Bytes, #{phase := body, length := Len, state := State}) ->
parse(body, Bytes, State, Len);
+parse(<>, #{phase := hdname, state := State}) ->
+ parse(body, Bytes, State, content_len(State));
parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none ->
parse(Phase, Bytes, State);
diff --git a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl
index 6aa22fb3b..cd06e58f5 100644
--- a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl
+++ b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl
@@ -385,6 +385,34 @@ t_1000_msg_send(_) ->
lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000))
end).
+t_sticky_packets_truncate_after_headers(_) ->
+ with_connection(fun(Sock) ->
+ gen_tcp:send(Sock, serialize(<<"CONNECT">>,
+ [{<<"accept-version">>, ?STOMP_VER},
+ {<<"host">>, <<"127.0.0.1:61613">>},
+ {<<"login">>, <<"guest">>},
+ {<<"passcode">>, <<"guest">>},
+ {<<"heart-beat">>, <<"0,0">>}])),
+ {ok, Data} = gen_tcp:recv(Sock, 0),
+ {ok, #stomp_frame{command = <<"CONNECTED">>,
+ headers = _,
+ body = _}, _, _} = parse(Data),
+
+ Topic = <<"/queue/foo">>,
+
+ emqx:subscribe(Topic),
+ gen_tcp:send(Sock, ["SEND\n",
+ "content-length:3\n",
+ "destination:/queue/foo\n"]),
+ timer:sleep(300),
+ gen_tcp:send(Sock, ["\nfoo",0]),
+ receive
+ {deliver, Topic, _Msg}->
+ ok
+ after 100 ->
+ ?assert(false, "waiting message timeout")
+ end
+ end).
t_rest_clienit_info(_) ->
with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl
index 5bd3f5681..77658e35b 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl
@@ -369,13 +369,13 @@ fields(keepalive) ->
fields(subscribe) ->
[
- {topic, hoconsc:mk(binary(), #{desc => <<"Access type">>})},
+ {topic, hoconsc:mk(binary(), #{desc => <<"Topic">>})},
{qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})}
];
fields(unsubscribe) ->
[
- {topic, hoconsc:mk(binary(), #{desc => <<"Access type">>})}
+ {topic, hoconsc:mk(binary(), #{desc => <<"Topic">>})}
];
fields(meta) ->
diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl
index 851d2a4c4..61405b5a9 100644
--- a/apps/emqx_resource/src/emqx_resource.erl
+++ b/apps/emqx_resource/src/emqx_resource.erl
@@ -70,7 +70,6 @@
-export([ call_start/3 %% start the instance
, call_health_check/3 %% verify if the resource is working normally
, call_stop/3 %% stop the instance
- , call_jsonify/2
]).
-export([ list_instances/0 %% list all the instances, id only.
@@ -86,11 +85,8 @@
-optional_callbacks([ on_query/4
, on_health_check/2
- , on_jsonify/1
]).
--callback on_jsonify(resource_config()) -> jsx:json_term().
-
%% when calling emqx_resource:start/1
-callback on_start(instance_id(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}.
@@ -284,13 +280,6 @@ call_health_check(InstId, Mod, ResourceState) ->
call_stop(InstId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_stop(InstId, ResourceState)).
--spec call_jsonify(module(), resource_config()) -> jsx:json_term().
-call_jsonify(Mod, Config) ->
- case erlang:function_exported(Mod, on_jsonify, 1) of
- false -> Config;
- true -> ?SAFE_CALL(Mod:on_jsonify(Config))
- end.
-
-spec check_config(resource_type(), raw_resource_config()) ->
{ok, resource_config()} | {error, term()}.
check_config(ResourceType, Conf) ->
diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
index 196ec642b..f5148d9e0 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
@@ -207,8 +207,7 @@ fields("ctx_disconnected") ->
].
qos() ->
- {"qos", sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2)]),
- #{desc => "The Message QoS"})}.
+ {"qos", sc(emqx_schema:qos(), #{desc => "The Message QoS"})}.
rule_id() ->
{"id", sc(binary(),
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
index 66a67a0aa..823e0c879 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
@@ -194,7 +194,7 @@ outputs() ->
].
qos() ->
- hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2), binary()]).
+ hoconsc:union([emqx_schema:qos(), binary()]).
validate_sql(Sql) ->
case emqx_rule_sqlparser:parse(Sql) of