diff --git a/apps/emqx_gateway/src/emqx_gateway_registry.erl b/apps/emqx_gateway/src/emqx_gateway_registry.erl index a100636cf..dde440756 100644 --- a/apps/emqx_gateway/src/emqx_gateway_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_registry.erl @@ -71,7 +71,9 @@ start_link() -> , state => any() }. --spec load(gateway_type(), registry_options(), gateway_options()) -> ok | {error, any()}. +-spec load(gateway_type(), registry_options(), gateway_options()) + -> ok + | {error, any()}. load(Type, RgOpts, GwOpts) -> CbMod = proplists:get_value(cbkmod, RgOpts, Type), Dscrptr = #{ cbkmod => CbMod diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index f3b7c9b8a..c9c26a766 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -163,12 +163,21 @@ init(ConnInfo = #{socktype := Socktype, try_dispatch(on_socket_created, wrap(Req), Channel). %% @private -peercert(nossl, ConnInfo) -> +peercert(NoSsl, ConnInfo) when NoSsl == nossl; + NoSsl == undefined -> ConnInfo; peercert(Peercert, ConnInfo) -> - ConnInfo#{peercert => - #{cn => esockd_peercert:common_name(Peercert), - dn => esockd_peercert:subject(Peercert)}}. + Fn = fun(_, V) -> V =/= undefined end, + Infos = maps:filter(Fn, + #{cn => esockd_peercert:common_name(Peercert), + dn => esockd_peercert:subject(Peercert)} + ), + case maps:size(Infos) of + 0 -> + ConnInfo; + _ -> + ConnInfo#{peercert => Infos} + end. %% @private socktype(tcp) -> 'TCP'; diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_conn.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_conn.erl index 256a69b30..5c647a768 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_conn.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_conn.erl @@ -106,12 +106,8 @@ start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) -> %% tcp/ssl/dtls start_link(esockd_transport, Sock, Options) -> Socket = {esockd_transport, Sock}, - case esockd_transport:peername(Sock) of - {ok, Peername} -> - Args = [self(), Socket, Peername, Options], - {ok, proc_lib:spawn_link(?MODULE, init, Args)}; - R = {error, _} -> R - end. + Args = [self(), Socket, undefined, Options], + {ok, proc_lib:spawn_link(?MODULE, init, Args)}. %%-------------------------------------------------------------------- %% API @@ -170,6 +166,12 @@ stop(Pid) -> %% Wrapped funcs %%-------------------------------------------------------------------- +esockd_peername({udp, _SockPid, _Sock}, Peername) -> + Peername; +esockd_peername({esockd_transport, Sock}, _Peername) -> + {ok, Peername} = esockd_transport:ensure_ok_or_exit(peername, [Sock]), + Peername. + esockd_wait(Socket = {udp, _SockPid, _Sock}) -> {ok, Socket}; esockd_wait({esockd_transport, Sock}) -> @@ -222,9 +224,10 @@ send(Data, #state{socket = {esockd_transport, Sock}}) -> -define(DEFAULT_OOM_POLICY, #{enable => true, max_heap_size => 4194304, max_message_queue_len => 32000}). -init(Parent, WrappedSock, Peername, Options) -> +init(Parent, WrappedSock, Peername0, Options) -> case esockd_wait(WrappedSock) of {ok, NWrappedSock} -> + Peername = esockd_peername(NWrappedSock, Peername0), run_loop(Parent, init_state(NWrappedSock, Peername, Options)); {error, Reason} -> ok = esockd_close(WrappedSock), diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_conn.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_conn.erl index cf2ac4011..9fac0159d 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_conn.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_conn.erl @@ -720,8 +720,9 @@ run_gc(Stats, State = #state{gc_state = GcSt}) -> check_oom(State = #state{oom_policy = OomPolicy}) -> case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of - Shutdown = {shutdown, _Reason} -> - erlang:send(self(), Shutdown); + {shutdown, Reason} -> + %% triggers terminate/2 callback immediately + erlang:exit({shutdown, Reason}); _Other -> ok end, State. diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 575add29b..7dc2b2fbf 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -79,7 +79,7 @@ | {event, conn_state()|updated} | {close, Reason :: atom()}). --type(replies() :: emqx_stomp_frame:packet() | reply() | [reply()]). +-type(replies() :: stomp_frame() | reply() | [reply()]). -define(TIMER_TABLE, #{ incoming_timer => incoming, @@ -97,11 +97,6 @@ -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). --dialyzer({nowarn_function, [init/2,enrich_conninfo/2,ensure_connected/1, - process_connect/1,handle_in/2,handle_info/2, - ensure_disconnected/2,reverse_heartbeats/1, - negotiate_version/2]}). - %%-------------------------------------------------------------------- %% Init the channel %%-------------------------------------------------------------------- @@ -136,6 +131,7 @@ init(ConnInfo = #{peername := {PeerHost, _}, , clientinfo_override = Override , timers = #{} , transaction = #{} + , conn_state = idle }. setting_peercert_infos(NoSSL, ClientInfo) @@ -180,7 +176,7 @@ enrich_conninfo(_Packet, Channel = #channel{conninfo = ConnInfo}) -> %% XXX: How enrich more infos? NConnInfo = ConnInfo#{ proto_name => <<"STOMP">> - , proto_ver => undefined + , proto_ver => <<"1.2">> , clean_start => true , keepalive => 0 , expiry_interval => 0 @@ -320,7 +316,7 @@ process_connect(Channel = #channel{ %% Handle incoming packet %%-------------------------------------------------------------------- --spec handle_in(emqx_types:packet(), channel()) +-spec handle_in(stomp_frame() | {frame_error, any()}, channel()) -> {ok, channel()} | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()} @@ -467,12 +463,12 @@ handle_in(?PACKET(?CMD_COMMIT, Headers), Channel) -> case trans_pipeline(lists:reverse(Actions), [], Chann0) of {ok, Outgoings, Chann1} -> maybe_outgoing_receipt(receipt_id(Headers), Outgoings, Chann1); - {error, Reason} -> + {error, Reason, Chann1} -> %% FIXME: atomic for transaction ?? ErrMsg = io_lib:format("Execute transaction ~s falied: ~0p", [TxId, Reason] ), - handle_out(error, {receipt_id(Headers), ErrMsg}, Chann0) + handle_out(error, {receipt_id(Headers), ErrMsg}, Chann1) end end); @@ -563,13 +559,15 @@ handle_out(receipt, ReceiptId, Channel) -> -spec(handle_call(Req :: term(), channel()) -> {reply, Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()} - | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}). + | {shutdown, Reason :: term(), Reply :: term(), stomp_frame(), channel()}). handle_call(kick, Channel) -> NChannel = ensure_disconnected(kicked, Channel), - shutdown_and_reply(kicked, ok, NChannel); + Frame = error_frame(undefined, <<"Kicked out">>), + shutdown_and_reply(kicked, ok, Frame, NChannel); handle_call(discard, Channel) -> - shutdown_and_reply(discarded, ok, Channel); + Frame = error_frame(undefined, <<"Discarded">>), + shutdown_and_reply(discarded, ok, Frame, Channel); %% XXX: No Session Takeover %handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> @@ -785,8 +783,11 @@ shutdown_with_recepit(Reason, ReceiptId, Channel) -> shutdown(Reason, AckFrame, Channel) -> {shutdown, Reason, AckFrame, Channel}. -shutdown_and_reply(Reason, Reply, Channel) -> - {shutdown, Reason, Reply, Channel}. +%shutdown_and_reply(Reason, Reply, Channel) -> +% {shutdown, Reason, Reply, Channel}. + +shutdown_and_reply(Reason, Reply, OutPkt, Channel) -> + {shutdown, Reason, Reply, OutPkt, Channel}. do_negotiate_version(undefined) -> {ok, <<"1.0">>}; diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_connection.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_connection.erl index 73ba14cd2..82653af4e 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_connection.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_connection.erl @@ -87,11 +87,13 @@ %% GC State gc_state :: emqx_gc:gc_state() | undefined, %% Stats Timer - stats_timer :: disabled | reference(), + stats_timer :: disabled | reference() | undefined, %% Idle Timeout idle_timeout :: integer(), %% Idle Timer - idle_timer :: reference() | undefined + idle_timer :: reference() | undefined, + %% OOM Policy + oom_policy :: emqx_types:oom_policy() | undefined }). -type(state() :: #state{}). @@ -121,11 +123,9 @@ , system_code_change/4 ]}). --dialyzer({nowarn_function, [ensure_stats_timer/2,cancel_stats_timer/1, - terminate/2,handle_call/3,handle_timeout/3, - parse_incoming/3,serialize_and_inc_stats_fun/1, - check_oom/1,inc_incoming_stats/1, - inc_outgoing_stats/1]}). +-dialyzer({no_match, [ handle_call/3 + , serialize_and_inc_stats_fun/1 + ]}). -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist()) -> {ok, pid()}). @@ -244,9 +244,7 @@ init_state(Transport, Socket, Options) -> peername => Peername, sockname => Sockname, peercert => Peercert, - conn_mod => ?MODULE, - zone => default, - listener => mqtt_tcp + conn_mod => ?MODULE }, ActiveN = emqx_gateway_utils:active_n(Options), %% TODO: RateLimit ? How ? @@ -260,6 +258,7 @@ init_state(Transport, Socket, Options) -> GcState = emqx_gateway_utils:init_gc_state(Options), StatsTimer = emqx_gateway_utils:stats_timer(Options), IdleTimeout = emqx_gateway_utils:idle_timeout(Options), + OomPolicy = emqx_gateway_utils:oom_policy(Options), IdleTimer = emqx_misc:start_timer(IdleTimeout, idle_timeout), #state{transport = Transport, socket = Socket, @@ -274,17 +273,16 @@ init_state(Transport, Socket, Options) -> gc_state = GcState, stats_timer = StatsTimer, idle_timeout = IdleTimeout, - idle_timer = IdleTimer + idle_timer = IdleTimer, + oom_policy = OomPolicy }. run_loop(Parent, State = #state{transport = Transport, socket = Socket, peername = Peername, - channel = _Channel}) -> + oom_policy = OomPolicy}) -> emqx_logger:set_metadata_peername(esockd:format(Peername)), - % TODO: How yo get oom_policy ??? - %emqx_misc:tune_heap_size(emqx_gateway_utils:oom_policy( - % emqx_stomp_channel:info(zone, Channel))), + _ = emqx_misc:tune_heap_size(OomPolicy), case activate_socket(State) of {ok, NState} -> hibernate(Parent, NState); {error, Reason} -> @@ -806,16 +804,12 @@ run_gc(Stats, State = #state{gc_state = GcSt}) -> State#state{gc_state = GcSt1} end. -check_oom(State = #state{channel = Channel}) -> - Zone = emqx_stomp_channel:info(zone, Channel), - OomPolicy = emqx_gateway_utils:oom_policy(Zone), - ?tp(debug, check_oom, #{policy => OomPolicy}), +check_oom(State = #state{oom_policy = OomPolicy}) -> case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of {shutdown, Reason} -> %% triggers terminate/2 callback immediately erlang:exit({shutdown, Reason}); - _Other -> - ok + _Other -> ok end, State. @@ -847,28 +841,29 @@ close_socket(State = #state{transport = Transport, socket = Socket}) -> %% Inc incoming/outgoing stats %% XXX: Other packet type? -inc_incoming_stats(Packet = ?PACKET(Type)) -> +inc_incoming_stats(_Packet) -> inc_counter(recv_pkt, 1), - case Type =:= ?CMD_SEND of - true -> - inc_counter(recv_msg, 1), - inc_counter(incoming_pubs, 1); - false -> - ok - end, - %% FIXME: - emqx_metrics:inc_recv(Packet). + ok. + %case Type =:= ?CMD_SEND of + % true -> + % inc_counter(recv_msg, 1), + % inc_counter(incoming_pubs, 1); + % false -> + % ok + %end, + %emqx_metrics:inc_recv(Packet). -inc_outgoing_stats(Packet = ?PACKET(Type)) -> +inc_outgoing_stats(_Packet) -> inc_counter(send_pkt, 1), - case Type =:= ?CMD_MESSAGE of - true -> - inc_counter(send_msg, 1), - inc_counter(outgoing_pubs, 1); - false -> - ok - end, - emqx_metrics:inc_sent(Packet). + ok. + %case Type =:= ?CMD_MESSAGE of + % true -> + % inc_counter(send_msg, 1), + % inc_counter(outgoing_pubs, 1); + % false -> + % ok + %end, + %emqx_metrics:inc_sent(Packet). %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl index 4db8a1f5f..b77439ba7 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl @@ -98,7 +98,7 @@ -record(frame_limit, {max_header_num, max_header_length, max_body_length}). --type(parse_result() :: {ok, stomp_frame(), binary()} +-type(parse_result() :: {ok, stomp_frame(), binary(), parse_state()} | {more, parse_state()}). -type(parse_state() :: @@ -107,7 +107,7 @@ state := #parser_state{} }). --dialyzer({nowarn_function, [serialize_pkt/2,make/1]}). +%-dialyzer({nowarn_function, [serialize_pkt/2,make/1]}). %% @doc Initialize a parser -spec initial_parse_state(map()) -> parse_state(). diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index 86cce9c91..5592a7839 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -33,20 +33,19 @@ -define(TCP_OPTS, [binary, {packet, raw}, {reuseaddr, true}, {nodelay, true}]). --dialyzer({nowarn_function, [load/0]}). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- +-spec load() -> ok | {error, any()}. load() -> RegistryOptions = [ {cbkmod, ?MODULE} - , {schema, emqx_stomp_schema} ], YourOptions = [param1, param2], emqx_gateway_registry:load(stomp, RegistryOptions, YourOptions). +-spec unload() -> ok | {error, any()}. unload() -> emqx_gateway_registry:unload(stomp). diff --git a/apps/emqx_gateway/src/stomp/include/emqx_stomp.hrl b/apps/emqx_gateway/src/stomp/include/emqx_stomp.hrl index cffcb1bdf..9976f1da7 100644 --- a/apps/emqx_gateway/src/stomp/include/emqx_stomp.hrl +++ b/apps/emqx_gateway/src/stomp/include/emqx_stomp.hrl @@ -44,15 +44,16 @@ -define(CMD_RECEIPT, <<"RECEIPT">>). -define(CMD_ERROR, <<"ERROR">>). --type client_command() :: binary(). %-type client_command() :: ?CMD_SEND | ?CMD_SUBSCRIBE | ?CMD_UNSUBSCRIBE % | ?CMD_BEGIN | ?CMD_COMMIT | ?CMD_ABORT | ?CMD_ACK % | ?CMD_NACK | ?CMD_DISCONNECT | ?CMD_CONNECT % | ?CMD_STOMP. % --type server_command() :: binary(). +-type client_command() :: binary(). + %-type server_command() :: ?CMD_CONNECTED | ?CMD_MESSAGE | ?CMD_RECEIPT % | ?CMD_ERROR. +-type server_command() :: binary() | heartbeat. -record(stomp_frame, { command :: client_command() | server_command(),