diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl index 6cf1371e6..5a8b7a92c 100644 --- a/apps/emqx_stomp/src/emqx_stomp_connection.erl +++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl @@ -26,8 +26,7 @@ -logger_header("[Stomp-Conn]"). -import(emqx_misc, - [ maybe_apply/2 - , start_timer/2 + [ start_timer/2 ]). -export([ start_link/3 @@ -92,6 +91,13 @@ -define(ENABLED(X), (X =/= undefined)). +-dialyzer({nowarn_function, [ ensure_stats_timer/2 + ]}). + +-dialyzer({no_return, [ init/1 + , init_state/3 + ]}). + start_link(Transport, Sock, ProtoEnv) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, ProtoEnv]])}. @@ -115,13 +121,7 @@ info(sockname, #state{sockname = Sockname}) -> info(sockstate, #state{sockstate = SockSt}) -> SockSt; info(active_n, #state{active_n = ActiveN}) -> - ActiveN; -info(stats_timer, #state{stats_timer = StatsTimer}) -> - StatsTimer; -info(limit_timer, #state{limit_timer = LimitTimer}) -> - LimitTimer; -info(limiter, #state{limiter = Limiter}) -> - maybe_apply(fun emqx_limiter:info/1, Limiter). + ActiveN. -spec stats(pid()|state()) -> emqx_types:stats(). stats(CPid) when is_pid(CPid) -> @@ -216,6 +216,12 @@ send(Data, Transport, Sock, ConnPid) -> heartbeat(Transport, Sock) -> Transport:send(Sock, <<$\n>>). +handle_call(info, _From, State) -> + {reply, info(State), State}; + +handle_call(stats, _From, State) -> + {reply, stats(State), State}; + handle_call(discard, _From, State) -> %% TODO: send the DISCONNECT packet? shutdown_and_reply(discared, ok, State); diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index c2851895b..eb628ce47 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -20,6 +20,7 @@ -include("emqx_stomp.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -55,7 +56,7 @@ %% Stomp ClientInfo clientinfo :: emqx_types:clientinfo(), %% Stomp Heartbeats - heart_beats :: emqx_stomp_hearbeat:heartbeat(), + heart_beats :: maybe(emqx_stomp_hearbeat:heartbeat()), %% Stomp Connection State connected = false, %% Timers @@ -65,13 +66,13 @@ %% Subscriptions subscriptions = #{}, %% Send function - sendfun :: function(), + sendfun :: {function(), list()}, %% Heartbeat function - heartfun :: function(), + heartfun :: {function(), list()}, %% The confs for the connection %% TODO: put these configs into a public mem? - allow_anonymous, - default_user + allow_anonymous :: maybe(boolean()), + default_user :: maybe(list()) }). -define(TIMER_TABLE, #{ @@ -96,6 +97,10 @@ awaiting_rel_max ]). +-dialyzer({nowarn_function, [ check_acl/3 + , init/2 + ]}). + -type(pstate() :: #pstate{}). %% @doc Init protocol @@ -417,7 +422,7 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, {error, dropped, State} end; -send(Frame, State = #pstate{sendfun = {Fun, Args}}) -> +send(Frame, State = #pstate{sendfun = {Fun, Args}}) when is_record(Frame, stomp_frame) -> ?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]), Data = emqx_stomp_frame:serialize(Frame), ?LOG(debug, "SEND ~p", [Data]), @@ -681,7 +686,7 @@ allow_anonymous(#pstate{allow_anonymous = AllowAnonymous}) -> AllowAnonymous. ensure_connected(State = #pstate{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> + clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{ connected => true, connected_at => erlang:system_time(millisecond) diff --git a/src/emqx_types.erl b/src/emqx_types.erl index fbe62e4b2..0c15bd288 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -94,7 +94,8 @@ -type(ver() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5 - | non_neg_integer()). + | non_neg_integer() + | binary()). -type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2). -type(qos_name() :: qos0 | at_most_once |