chore: fix lot of running bugs
This commit is contained in:
parent
6f51b9f842
commit
0d9e0bd3fe
|
@ -245,8 +245,6 @@ filter_out_low_level_opts(Type, RawCfg = #{gw_conf := Conf0}) when ?IS_ESOCKD_LI
|
||||||
acceptors,
|
acceptors,
|
||||||
max_connections,
|
max_connections,
|
||||||
max_conn_rate,
|
max_conn_rate,
|
||||||
proxy_protocol,
|
|
||||||
proxy_protocol_timeout,
|
|
||||||
tcp_options,
|
tcp_options,
|
||||||
ssl_options,
|
ssl_options,
|
||||||
udp_options,
|
udp_options,
|
||||||
|
@ -261,13 +259,10 @@ filter_out_low_level_opts(Type, RawCfg = #{gw_conf := Conf0}) when ?IS_COWBOY_LI
|
||||||
acceptors,
|
acceptors,
|
||||||
max_connections,
|
max_connections,
|
||||||
max_conn_rate,
|
max_conn_rate,
|
||||||
proxy_protocol,
|
|
||||||
proxy_protocol_timeout,
|
|
||||||
tcp_options,
|
tcp_options,
|
||||||
ssl_options,
|
ssl_options,
|
||||||
udp_options,
|
udp_options,
|
||||||
dtls_options,
|
dtls_options
|
||||||
websocket
|
|
||||||
],
|
],
|
||||||
Conf1 = maps:without(CowboyKeys, RawCfg),
|
Conf1 = maps:without(CowboyKeys, RawCfg),
|
||||||
maps:merge(Conf0, Conf1).
|
maps:merge(Conf0, Conf1).
|
||||||
|
@ -536,7 +531,7 @@ ranch_opts(Type, ListenOn, Opts) ->
|
||||||
ws_opts(Opts, Conf) ->
|
ws_opts(Opts, Conf) ->
|
||||||
ConnMod = maps:get(connection_mod, Conf, emqx_gateway_conn),
|
ConnMod = maps:get(connection_mod, Conf, emqx_gateway_conn),
|
||||||
WsPaths = [
|
WsPaths = [
|
||||||
{emqx_utils_maps:deep_get([websocket, path], Opts, "/"), ConnMod, Conf}
|
{emqx_utils_maps:deep_get([websocket, path], Opts, "") ++ "/[...]", ConnMod, Conf}
|
||||||
],
|
],
|
||||||
Dispatch = cowboy_router:compile([{'_', WsPaths}]),
|
Dispatch = cowboy_router:compile([{'_', WsPaths}]),
|
||||||
ProxyProto = maps:get(proxy_protocol, Opts, false),
|
ProxyProto = maps:get(proxy_protocol, Opts, false),
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
Business Source License 1.1
|
||||||
|
|
||||||
|
Licensor: Hangzhou EMQ Technologies Co., Ltd.
|
||||||
|
Licensed Work: EMQX Enterprise Edition
|
||||||
|
The Licensed Work is (c) 2023
|
||||||
|
Hangzhou EMQ Technologies Co., Ltd.
|
||||||
|
Additional Use Grant: Students and educators are granted right to copy,
|
||||||
|
modify, and create derivative work for research
|
||||||
|
or education.
|
||||||
|
Change Date: 2027-02-01
|
||||||
|
Change License: Apache License, Version 2.0
|
||||||
|
|
||||||
|
For information about alternative licensing arrangements for the Software,
|
||||||
|
please contact Licensor: https://www.emqx.com/en/contact
|
||||||
|
|
||||||
|
Notice
|
||||||
|
|
||||||
|
The Business Source License (this document, or the “License”) is not an Open
|
||||||
|
Source license. However, the Licensed Work will eventually be made available
|
||||||
|
under an Open Source License, as stated in this License.
|
||||||
|
|
||||||
|
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
|
||||||
|
“Business Source License” is a trademark of MariaDB Corporation Ab.
|
||||||
|
|
||||||
|
-----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
Business Source License 1.1
|
||||||
|
|
||||||
|
Terms
|
||||||
|
|
||||||
|
The Licensor hereby grants you the right to copy, modify, create derivative
|
||||||
|
works, redistribute, and make non-production use of the Licensed Work. The
|
||||||
|
Licensor may make an Additional Use Grant, above, permitting limited
|
||||||
|
production use.
|
||||||
|
|
||||||
|
Effective on the Change Date, or the fourth anniversary of the first publicly
|
||||||
|
available distribution of a specific version of the Licensed Work under this
|
||||||
|
License, whichever comes first, the Licensor hereby grants you rights under
|
||||||
|
the terms of the Change License, and the rights granted in the paragraph
|
||||||
|
above terminate.
|
||||||
|
|
||||||
|
If your use of the Licensed Work does not comply with the requirements
|
||||||
|
currently in effect as described in this License, you must purchase a
|
||||||
|
commercial license from the Licensor, its affiliated entities, or authorized
|
||||||
|
resellers, or you must refrain from using the Licensed Work.
|
||||||
|
|
||||||
|
All copies of the original and modified Licensed Work, and derivative works
|
||||||
|
of the Licensed Work, are subject to this License. This License applies
|
||||||
|
separately for each version of the Licensed Work and the Change Date may vary
|
||||||
|
for each version of the Licensed Work released by Licensor.
|
||||||
|
|
||||||
|
You must conspicuously display this License on each original or modified copy
|
||||||
|
of the Licensed Work. If you receive the Licensed Work in original or
|
||||||
|
modified form from a third party, the terms and conditions set forth in this
|
||||||
|
License apply to your use of that work.
|
||||||
|
|
||||||
|
Any use of the Licensed Work in violation of this License will automatically
|
||||||
|
terminate your rights under this License for the current and all other
|
||||||
|
versions of the Licensed Work.
|
||||||
|
|
||||||
|
This License does not grant you any right in any trademark or logo of
|
||||||
|
Licensor or its affiliates (provided that you may use a trademark or logo of
|
||||||
|
Licensor as expressly required by this License).
|
||||||
|
|
||||||
|
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
|
||||||
|
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
|
||||||
|
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
|
||||||
|
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
|
||||||
|
TITLE.
|
||||||
|
|
||||||
|
MariaDB hereby grants you permission to use this License’s text to license
|
||||||
|
your works, and to refer to it using the trademark “Business Source License”,
|
||||||
|
as long as you comply with the Covenants of Licensor below.
|
||||||
|
|
||||||
|
Covenants of Licensor
|
||||||
|
|
||||||
|
In consideration of the right to use this License’s text and the “Business
|
||||||
|
Source License” name and trademark, Licensor covenants to MariaDB, and to all
|
||||||
|
other recipients of the licensed work to be provided by Licensor:
|
||||||
|
|
||||||
|
1. To specify as the Change License the GPL Version 2.0 or any later version,
|
||||||
|
or a license that is compatible with GPL Version 2.0 or a later version,
|
||||||
|
where “compatible” means that software provided under the Change License can
|
||||||
|
be included in a program with software provided under GPL Version 2.0 or a
|
||||||
|
later version. Licensor may specify additional Change Licenses without
|
||||||
|
limitation.
|
||||||
|
|
||||||
|
2. To either: (a) specify an additional grant of rights to use that does not
|
||||||
|
impose any additional restriction on the right granted in this License, as
|
||||||
|
the Additional Use Grant; or (b) insert the text “None”.
|
||||||
|
|
||||||
|
3. To specify a Change Date.
|
||||||
|
|
||||||
|
4. Not to modify this License in any other way.
|
|
@ -63,7 +63,7 @@
|
||||||
%% ClientInfo
|
%% ClientInfo
|
||||||
clientinfo :: emqx_types:clientinfo(),
|
clientinfo :: emqx_types:clientinfo(),
|
||||||
%% Session
|
%% Session
|
||||||
session :: maybe(emqx_session:session()),
|
session :: maybe(map()),
|
||||||
%% ClientInfo override specs
|
%% ClientInfo override specs
|
||||||
clientinfo_override :: map(),
|
clientinfo_override :: map(),
|
||||||
%% Keepalive
|
%% Keepalive
|
||||||
|
@ -163,18 +163,41 @@ info(clientid, #channel{clientinfo = ClientInfo}) ->
|
||||||
maps:get(clientid, ClientInfo, undefined);
|
maps:get(clientid, ClientInfo, undefined);
|
||||||
info(username, #channel{clientinfo = ClientInfo}) ->
|
info(username, #channel{clientinfo = ClientInfo}) ->
|
||||||
maps:get(username, ClientInfo, undefined);
|
maps:get(username, ClientInfo, undefined);
|
||||||
info(session, #channel{session = Session}) ->
|
info(session, #channel{conninfo = ConnInfo}) ->
|
||||||
emqx_utils:maybe_apply(fun emqx_session:info/1, Session);
|
%% XXX:
|
||||||
|
#{
|
||||||
|
created_at => maps:get(connected_at, ConnInfo, undefined),
|
||||||
|
is_persistent => false,
|
||||||
|
subscriptions => #{},
|
||||||
|
upgrade_qos => false,
|
||||||
|
retry_interval => 0,
|
||||||
|
await_rel_timeout => 0
|
||||||
|
};
|
||||||
info(conn_state, #channel{conn_state = ConnState}) ->
|
info(conn_state, #channel{conn_state = ConnState}) ->
|
||||||
ConnState;
|
ConnState;
|
||||||
info(keepalive, #channel{keepalive = Keepalive}) ->
|
info(keepalive, #channel{keepalive = Keepalive}) ->
|
||||||
emqx_utils:maybe_apply(fun emqx_ocpp_keepalive:info/1, Keepalive);
|
emqx_utils:maybe_apply(fun emqx_ocpp_keepalive:info/1, Keepalive);
|
||||||
|
info(ctx, #channel{ctx = Ctx}) ->
|
||||||
|
Ctx;
|
||||||
info(timers, #channel{timers = Timers}) ->
|
info(timers, #channel{timers = Timers}) ->
|
||||||
Timers.
|
Timers.
|
||||||
|
|
||||||
-spec stats(channel()) -> emqx_types:stats().
|
-spec stats(channel()) -> emqx_types:stats().
|
||||||
stats(#channel{session = Session}) ->
|
stats(#channel{mqueue = MQueue}) ->
|
||||||
lists:append(emqx_session:stats(Session), emqx_pd:get_counters(?CHANNEL_METRICS)).
|
%% XXX:
|
||||||
|
SessionStats = [
|
||||||
|
{subscriptions_cnt, 0},
|
||||||
|
{subscriptions_max, 0},
|
||||||
|
{inflight_cnt, 0},
|
||||||
|
{inflight_max, 0},
|
||||||
|
{mqueue_len, queue:len(MQueue)},
|
||||||
|
{mqueue_max, queue:len(MQueue)},
|
||||||
|
{mqueue_dropped, 0},
|
||||||
|
{next_pkt_id, 0},
|
||||||
|
{awaiting_rel_cnt, 0},
|
||||||
|
{awaiting_rel_max, 0}
|
||||||
|
],
|
||||||
|
lists:append(SessionStats, emqx_pd:get_counters(?CHANNEL_METRICS)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Init the channel
|
%% Init the channel
|
||||||
|
@ -300,9 +323,9 @@ enrich_client(
|
||||||
|
|
||||||
fix_mountpoint(ClientInfo = #{mountpoint := undefined}) ->
|
fix_mountpoint(ClientInfo = #{mountpoint := undefined}) ->
|
||||||
ClientInfo;
|
ClientInfo;
|
||||||
fix_mountpoint(ClientInfo = #{mountpoint := MountPoint}) ->
|
fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) ->
|
||||||
MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
|
Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
|
||||||
ClientInfo#{mountpoint := MountPoint1}.
|
ClientInfo#{mountpoint := Mountpoint1}.
|
||||||
|
|
||||||
set_log_meta(#channel{
|
set_log_meta(#channel{
|
||||||
clientinfo = #{clientid := ClientId},
|
clientinfo = #{clientid := ClientId},
|
||||||
|
@ -353,14 +376,16 @@ publish(
|
||||||
clientid := ClientId,
|
clientid := ClientId,
|
||||||
username := Username,
|
username := Username,
|
||||||
protocol := Protocol,
|
protocol := Protocol,
|
||||||
peerhost := PeerHost
|
peerhost := PeerHost,
|
||||||
|
mountpoint := Mountpoint
|
||||||
},
|
},
|
||||||
conninfo = #{proto_ver := ProtoVer}
|
conninfo = #{proto_ver := ProtoVer}
|
||||||
}
|
}
|
||||||
) when
|
) when
|
||||||
is_map(Frame)
|
is_map(Frame)
|
||||||
->
|
->
|
||||||
Topic = upstream_topic(Frame, Channel),
|
Topic0 = upstream_topic(Frame, Channel),
|
||||||
|
Topic = emqx_mountpoint:mount(Mountpoint, Topic0),
|
||||||
Payload = frame2payload(Frame),
|
Payload = frame2payload(Frame),
|
||||||
emqx_broker:publish(
|
emqx_broker:publish(
|
||||||
emqx_message:make(
|
emqx_message:make(
|
||||||
|
@ -386,14 +411,14 @@ upstream_topic(
|
||||||
case Type of
|
case Type of
|
||||||
?OCPP_MSG_TYPE_ID_CALL ->
|
?OCPP_MSG_TYPE_ID_CALL ->
|
||||||
Action = maps:get(action, Frame),
|
Action = maps:get(action, Frame),
|
||||||
emqx_placeholder:proc_tmpl(
|
proc_tmpl(
|
||||||
emqx_ocpp_conf:uptopic(Action),
|
emqx_ocpp_conf:uptopic(Action),
|
||||||
Vars#{action => Action}
|
Vars#{action => Action}
|
||||||
);
|
);
|
||||||
?OCPP_MSG_TYPE_ID_CALLRESULT ->
|
?OCPP_MSG_TYPE_ID_CALLRESULT ->
|
||||||
emqx_placeholder:proc_tmpl(emqx_ocpp_conf:up_reply_topic(), Vars);
|
proc_tmpl(emqx_ocpp_conf:up_reply_topic(), Vars);
|
||||||
?OCPP_MSG_TYPE_ID_CALLERROR ->
|
?OCPP_MSG_TYPE_ID_CALLERROR ->
|
||||||
emqx_placeholder:proc_tmpl(emqx_ocpp_conf:up_error_topic(), Vars)
|
proc_tmpl(emqx_ocpp_conf:up_error_topic(), Vars)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -589,27 +614,20 @@ process_connect(
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ensure_subscribe_dn_topics(
|
ensure_subscribe_dn_topics(
|
||||||
Channel = #channel{
|
Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint} = ClientInfo}
|
||||||
clientinfo = #{clientid := ClientId} = ClientInfo,
|
|
||||||
session = Session
|
|
||||||
}
|
|
||||||
) ->
|
) ->
|
||||||
TopicTokens = emqx_ocpp_conf:dntopic(),
|
|
||||||
SubOpts = #{rh => 0, rap => 0, nl => 0, qos => ?QOS_1},
|
SubOpts = #{rh => 0, rap => 0, nl => 0, qos => ?QOS_1},
|
||||||
Topic = emqx_placeholder:proc_tmpl(
|
Topic0 = proc_tmpl(
|
||||||
TopicTokens,
|
emqx_ocpp_conf:dntopic(),
|
||||||
#{
|
#{
|
||||||
clientid => ClientId,
|
clientid => ClientId,
|
||||||
cid => ClientId
|
cid => ClientId
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
{ok, NSession} = emqx_session:subscribe(
|
Topic = emqx_mountpoint:mount(Mountpoint, Topic0),
|
||||||
ClientInfo,
|
ok = emqx_broker:subscribe(Topic, ClientId, SubOpts),
|
||||||
Topic,
|
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
|
||||||
SubOpts,
|
Channel.
|
||||||
Session
|
|
||||||
),
|
|
||||||
Channel#channel{session = NSession}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle timeout
|
%% Handle timeout
|
||||||
|
@ -694,10 +712,8 @@ terminate({shutdown, Reason}, Channel) when
|
||||||
terminate(Reason, Channel) ->
|
terminate(Reason, Channel) ->
|
||||||
run_terminate_hook(Reason, Channel).
|
run_terminate_hook(Reason, Channel).
|
||||||
|
|
||||||
run_terminate_hook(_Reason, #channel{session = undefined}) ->
|
run_terminate_hook(Reason, Channel = #channel{clientinfo = ClientInfo}) ->
|
||||||
ok;
|
emqx_hooks:run('session.terminated', [ClientInfo, Reason, info(session, Channel)]).
|
||||||
run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
|
|
||||||
emqx_session:terminate(ClientInfo, Reason, Session).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
|
@ -795,12 +811,11 @@ ensure_disconnected(
|
||||||
Reason,
|
Reason,
|
||||||
Channel = #channel{
|
Channel = #channel{
|
||||||
conninfo = ConnInfo,
|
conninfo = ConnInfo,
|
||||||
clientinfo = ClientInfo = #{clientid := ClientId}
|
clientinfo = ClientInfo
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
|
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
|
||||||
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
|
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
|
||||||
emqx_cm:unregister_channel(ClientId),
|
|
||||||
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -865,6 +880,10 @@ shutdown(success, Reply, Channel) ->
|
||||||
shutdown(Reason, Reply, Channel) ->
|
shutdown(Reason, Reply, Channel) ->
|
||||||
{shutdown, Reason, Reply, Channel}.
|
{shutdown, Reason, Reply, Channel}.
|
||||||
|
|
||||||
|
proc_tmpl(Tmpl, Vars) ->
|
||||||
|
Tokens = emqx_placeholder:preproc_tmpl(Tmpl),
|
||||||
|
emqx_placeholder:proc_tmpl(Tokens, Vars).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% For CT tests
|
%% For CT tests
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -17,18 +17,10 @@
|
||||||
%% Conf modules for emqx-ocpp gateway
|
%% Conf modules for emqx-ocpp gateway
|
||||||
-module(emqx_ocpp_conf).
|
-module(emqx_ocpp_conf).
|
||||||
|
|
||||||
-export([
|
|
||||||
load/1,
|
|
||||||
unload/0,
|
|
||||||
get_env/1,
|
|
||||||
get_env/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
default_heartbeat_interval/0,
|
default_heartbeat_interval/0,
|
||||||
heartbeat_checking_times_backoff/0,
|
heartbeat_checking_times_backoff/0,
|
||||||
retry_interval/0,
|
retry_interval/0,
|
||||||
awaiting_timeout/0,
|
|
||||||
message_format_checking/0,
|
message_format_checking/0,
|
||||||
max_mqueue_len/0,
|
max_mqueue_len/0,
|
||||||
strit_mode/1,
|
strit_mode/1,
|
||||||
|
@ -38,29 +30,18 @@
|
||||||
dntopic/0
|
dntopic/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(KEY(Key), {?MODULE, Key}).
|
-define(KEY(K), [gateway, ocpp, K]).
|
||||||
|
|
||||||
load(Confs) ->
|
conf(K, Default) ->
|
||||||
lists:foreach(fun({K, V}) -> store(K, V) end, Confs).
|
emqx_config:get(?KEY(K), Default).
|
||||||
|
|
||||||
get_env(K) ->
|
|
||||||
get_env(K, undefined).
|
|
||||||
|
|
||||||
get_env(K, Default) ->
|
|
||||||
try
|
|
||||||
persistent_term:get(?KEY(K))
|
|
||||||
catch
|
|
||||||
error:badarg ->
|
|
||||||
Default
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec default_heartbeat_interval() -> pos_integer().
|
-spec default_heartbeat_interval() -> pos_integer().
|
||||||
default_heartbeat_interval() ->
|
default_heartbeat_interval() ->
|
||||||
get_env(default_heartbeat_interval, 600).
|
conf(default_heartbeat_interval, 600).
|
||||||
|
|
||||||
-spec heartbeat_checking_times_backoff() -> pos_integer().
|
-spec heartbeat_checking_times_backoff() -> pos_integer().
|
||||||
heartbeat_checking_times_backoff() ->
|
heartbeat_checking_times_backoff() ->
|
||||||
get_env(heartbeat_checking_times_backoff, 1).
|
conf(heartbeat_checking_times_backoff, 1).
|
||||||
|
|
||||||
-spec strit_mode(upstream | dnstream) -> boolean().
|
-spec strit_mode(upstream | dnstream) -> boolean().
|
||||||
strit_mode(dnstream) ->
|
strit_mode(dnstream) ->
|
||||||
|
@ -76,21 +57,17 @@ retry_interval() ->
|
||||||
max_mqueue_len() ->
|
max_mqueue_len() ->
|
||||||
dnstream(max_mqueue_len, 10).
|
dnstream(max_mqueue_len, 10).
|
||||||
|
|
||||||
-spec awaiting_timeout() -> pos_integer().
|
|
||||||
awaiting_timeout() ->
|
|
||||||
upstream(awaiting_timeout, 30).
|
|
||||||
|
|
||||||
-spec message_format_checking() ->
|
-spec message_format_checking() ->
|
||||||
all
|
all
|
||||||
| upstream_only
|
| upstream_only
|
||||||
| dnstream_only
|
| dnstream_only
|
||||||
| disable.
|
| disable.
|
||||||
message_format_checking() ->
|
message_format_checking() ->
|
||||||
get_env(message_format_checking, all).
|
conf(message_format_checking, all).
|
||||||
|
|
||||||
uptopic(Action) ->
|
uptopic(Action) ->
|
||||||
Topic = upstream(topic),
|
Topic = upstream(topic),
|
||||||
Mapping = upstream(mapping, #{}),
|
Mapping = upstream(topic_override_mapping, #{}),
|
||||||
maps:get(Action, Mapping, Topic).
|
maps:get(Action, Mapping, Topic).
|
||||||
|
|
||||||
up_reply_topic() ->
|
up_reply_topic() ->
|
||||||
|
@ -102,16 +79,6 @@ up_error_topic() ->
|
||||||
dntopic() ->
|
dntopic() ->
|
||||||
dnstream(topic).
|
dnstream(topic).
|
||||||
|
|
||||||
-spec unload() -> ok.
|
|
||||||
unload() ->
|
|
||||||
lists:foreach(
|
|
||||||
fun
|
|
||||||
({?KEY(K), _}) -> persistent_term:erase(?KEY(K));
|
|
||||||
(_) -> ok
|
|
||||||
end,
|
|
||||||
persistent_term:get()
|
|
||||||
).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% internal funcs
|
%% internal funcs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -120,34 +87,10 @@ dnstream(K) ->
|
||||||
dnstream(K, undefined).
|
dnstream(K, undefined).
|
||||||
|
|
||||||
dnstream(K, Def) ->
|
dnstream(K, Def) ->
|
||||||
L = get_env(dnstream, []),
|
emqx_config:get([gateway, ocpp, dnstream, K], Def).
|
||||||
proplists:get_value(K, L, Def).
|
|
||||||
|
|
||||||
upstream(K) ->
|
upstream(K) ->
|
||||||
upstream(K, undefined).
|
upstream(K, undefined).
|
||||||
|
|
||||||
upstream(K, Def) ->
|
upstream(K, Def) ->
|
||||||
L = get_env(upstream, []),
|
emqx_config:get([gateway, ocpp, upstream, K], Def).
|
||||||
proplists:get_value(K, L, Def).
|
|
||||||
|
|
||||||
store(upstream, L) ->
|
|
||||||
L1 = preproc([topic, reply_topic, error_topic], L),
|
|
||||||
Mapping = proplists:get_value(mapping, L1, #{}),
|
|
||||||
NMappings = maps:map(
|
|
||||||
fun(_, V) -> emqx_placeholder:preproc_tmpl(V) end,
|
|
||||||
Mapping
|
|
||||||
),
|
|
||||||
L2 = lists:keyreplace(mapping, 1, L1, {mapping, NMappings}),
|
|
||||||
persistent_term:put(?KEY(upstream), L2);
|
|
||||||
store(dnstream, L) ->
|
|
||||||
L1 = preproc([topic], L),
|
|
||||||
persistent_term:put(?KEY(dnstream), L1);
|
|
||||||
store(K, V) ->
|
|
||||||
persistent_term:put(?KEY(K), V).
|
|
||||||
|
|
||||||
preproc([], L) ->
|
|
||||||
L;
|
|
||||||
preproc([Key | More], L) ->
|
|
||||||
Val0 = proplists:get_value(Key, L),
|
|
||||||
Val = emqx_placeholder:preproc_tmpl(Val0),
|
|
||||||
preproc(More, lists:keyreplace(Key, 1, L, {Key, Val})).
|
|
||||||
|
|
|
@ -204,13 +204,13 @@ call(WsPid, Req, Timeout) when is_pid(WsPid) ->
|
||||||
|
|
||||||
init(Req, Opts) ->
|
init(Req, Opts) ->
|
||||||
%% WS Transport Idle Timeout
|
%% WS Transport Idle Timeout
|
||||||
IdleTimeout = proplists:get_value(idle_timeout, Opts, 7200000),
|
IdleTimeout = maps:get(idle_timeout, Opts, 7200000),
|
||||||
MaxFrameSize =
|
MaxFrameSize =
|
||||||
case proplists:get_value(max_frame_size, Opts, 0) of
|
case maps:get(max_frame_size, Opts, 0) of
|
||||||
0 -> infinity;
|
0 -> infinity;
|
||||||
I -> I
|
I -> I
|
||||||
end,
|
end,
|
||||||
Compress = proplists:get_bool(compress, Opts),
|
Compress = emqx_utils_maps:deep_get([websocket, compress], Opts),
|
||||||
WsOpts = #{
|
WsOpts = #{
|
||||||
compress => Compress,
|
compress => Compress,
|
||||||
max_frame_size => MaxFrameSize,
|
max_frame_size => MaxFrameSize,
|
||||||
|
@ -270,7 +270,7 @@ init_state_and_channel([Req, Opts, _WsOpts], _State = undefined) ->
|
||||||
},
|
},
|
||||||
Limiter = undeined,
|
Limiter = undeined,
|
||||||
ActiveN = emqx_gateway_utils:active_n(Opts),
|
ActiveN = emqx_gateway_utils:active_n(Opts),
|
||||||
Piggyback = proplists:get_value(piggyback, Opts, multiple),
|
Piggyback = emqx_utils_maps:deep_get([websocket, piggyback], Opts, multiple),
|
||||||
ParseState = emqx_ocpp_frame:initial_parse_state(#{}),
|
ParseState = emqx_ocpp_frame:initial_parse_state(#{}),
|
||||||
Serialize = emqx_ocpp_frame:serialize_opts(),
|
Serialize = emqx_ocpp_frame:serialize_opts(),
|
||||||
Channel = emqx_ocpp_channel:init(ConnInfo, Opts),
|
Channel = emqx_ocpp_channel:init(ConnInfo, Opts),
|
||||||
|
@ -303,7 +303,7 @@ init_state_and_channel([Req, Opts, _WsOpts], _State = undefined) ->
|
||||||
|
|
||||||
peername_and_cert(Req, Opts) ->
|
peername_and_cert(Req, Opts) ->
|
||||||
case
|
case
|
||||||
proplists:get_bool(proxy_protocol, Opts) andalso
|
maps:get(proxy_protocol, Opts, false) andalso
|
||||||
maps:get(proxy_header, Req)
|
maps:get(proxy_header, Req)
|
||||||
of
|
of
|
||||||
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
|
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
|
||||||
|
@ -323,8 +323,8 @@ peername_and_cert(Req, Opts) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
parse_sec_websocket_protocol([Req, Opts, WsOpts], State) ->
|
parse_sec_websocket_protocol([Req, Opts, WsOpts], State) ->
|
||||||
SupportedSubprotocols = proplists:get_value(supported_subprotocols, Opts),
|
SupportedSubprotocols = emqx_utils_maps:deep_get([websocket, supported_subprotocols], Opts),
|
||||||
FailIfNoSubprotocol = proplists:get_value(fail_if_no_subprotocol, Opts),
|
FailIfNoSubprotocol = emqx_utils_maps:deep_get([websocket, fail_if_no_subprotocol], Opts),
|
||||||
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
|
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
|
||||||
undefined ->
|
undefined ->
|
||||||
case FailIfNoSubprotocol of
|
case FailIfNoSubprotocol of
|
||||||
|
@ -402,7 +402,7 @@ auth_connect([Req, Opts, _WsOpts], State = #state{channel = Channel}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
parse_clientid(Req, Opts) ->
|
parse_clientid(Req, Opts) ->
|
||||||
PathPrefix = proplists:get_value(ocpp_path, Opts),
|
PathPrefix = emqx_utils_maps:deep_get([websocket, path], Opts),
|
||||||
[_, ClientId0] = binary:split(
|
[_, ClientId0] = binary:split(
|
||||||
cowboy_req:path(Req),
|
cowboy_req:path(Req),
|
||||||
iolist_to_binary(PathPrefix ++ "/")
|
iolist_to_binary(PathPrefix ++ "/")
|
||||||
|
@ -426,12 +426,12 @@ parse_protocol_name(<<"ocpp1.6">>) ->
|
||||||
parse_header_fun_origin(Req, Opts) ->
|
parse_header_fun_origin(Req, Opts) ->
|
||||||
case cowboy_req:header(<<"origin">>, Req) of
|
case cowboy_req:header(<<"origin">>, Req) of
|
||||||
undefined ->
|
undefined ->
|
||||||
case proplists:get_bool(allow_origin_absence, Opts) of
|
case emqx_utils_maps:deep_get([websocket, allow_origin_absence], Opts) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> {error, origin_header_cannot_be_absent}
|
false -> {error, origin_header_cannot_be_absent}
|
||||||
end;
|
end;
|
||||||
Value ->
|
Value ->
|
||||||
Origins = proplists:get_value(check_origins, Opts, []),
|
Origins = emqx_utils_maps:deep_get([websocket, check_origins], Opts, []),
|
||||||
case lists:member(Value, Origins) of
|
case lists:member(Value, Origins) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> {error, {origin_not_allowed, Value}}
|
false -> {error, {origin_not_allowed, Value}}
|
||||||
|
@ -439,7 +439,7 @@ parse_header_fun_origin(Req, Opts) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_origin_header(Req, Opts) ->
|
check_origin_header(Req, Opts) ->
|
||||||
case proplists:get_bool(check_origin_enable, Opts) of
|
case emqx_utils_maps:deep_get([websocket, check_origin_enable], Opts) of
|
||||||
true -> parse_header_fun_origin(Req, Opts);
|
true -> parse_header_fun_origin(Req, Opts);
|
||||||
false -> ok
|
false -> ok
|
||||||
end.
|
end.
|
||||||
|
@ -547,9 +547,15 @@ handle_info({connack, ConnAck}, State) ->
|
||||||
handle_info({close, Reason}, State) ->
|
handle_info({close, Reason}, State) ->
|
||||||
?SLOG(debug, #{msg => "force_to_close_socket", reason => Reason}),
|
?SLOG(debug, #{msg => "force_to_close_socket", reason => Reason}),
|
||||||
return(enqueue({close, Reason}, State));
|
return(enqueue({close, Reason}, State));
|
||||||
handle_info({event, connected}, State = #state{channel = Channel}) ->
|
handle_info({event, connected}, State = #state{chann_mod = ChannMod, channel = Channel}) ->
|
||||||
ClientId = emqx_ocpp_channel:info(clientid, Channel),
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
emqx_cm:insert_channel_info(ClientId, info(State), stats(State)),
|
ClientId = ChannMod:info(clientid, Channel),
|
||||||
|
emqx_gateway_ctx:insert_channel_info(
|
||||||
|
Ctx,
|
||||||
|
ClientId,
|
||||||
|
info(State),
|
||||||
|
stats(State)
|
||||||
|
),
|
||||||
return(State);
|
return(State);
|
||||||
handle_info({event, disconnected}, State = #state{chann_mod = ChannMod, channel = Channel}) ->
|
handle_info({event, disconnected}, State = #state{chann_mod = ChannMod, channel = Channel}) ->
|
||||||
Ctx = ChannMod:info(ctx, Channel),
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
|
@ -577,12 +583,14 @@ handle_timeout(
|
||||||
TRef,
|
TRef,
|
||||||
emit_stats,
|
emit_stats,
|
||||||
State = #state{
|
State = #state{
|
||||||
|
chann_mod = ChannMod,
|
||||||
channel = Channel,
|
channel = Channel,
|
||||||
stats_timer = TRef
|
stats_timer = TRef
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
ClientId = emqx_ocpp_channel:info(clientid, Channel),
|
Ctx = ChannMod:info(ctx, Channel),
|
||||||
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
ClientId = ChannMod:info(clientid, Channel),
|
||||||
|
emqx_gateway_ctx:set_chan_stats(Ctx, ClientId, stats(State)),
|
||||||
return(State#state{stats_timer = undefined});
|
return(State#state{stats_timer = undefined});
|
||||||
handle_timeout(TRef, TMsg, State) ->
|
handle_timeout(TRef, TMsg, State) ->
|
||||||
with_channel(handle_timeout, [TRef, TMsg], State).
|
with_channel(handle_timeout, [TRef, TMsg], State).
|
||||||
|
@ -852,7 +860,9 @@ trigger(Event) -> erlang:send(self(), Event).
|
||||||
|
|
||||||
get_peer(Req, Opts) ->
|
get_peer(Req, Opts) ->
|
||||||
{PeerAddr, PeerPort} = cowboy_req:peer(Req),
|
{PeerAddr, PeerPort} = cowboy_req:peer(Req),
|
||||||
AddrHeader = cowboy_req:header(proplists:get_value(proxy_address_header, Opts), Req, <<>>),
|
AddrHeader = cowboy_req:header(
|
||||||
|
emqx_utils_maps:deep_get([websocket, proxy_address_header], Opts), Req, <<>>
|
||||||
|
),
|
||||||
ClientAddr =
|
ClientAddr =
|
||||||
case string:tokens(binary_to_list(AddrHeader), ", ") of
|
case string:tokens(binary_to_list(AddrHeader), ", ") of
|
||||||
[] ->
|
[] ->
|
||||||
|
@ -867,7 +877,9 @@ get_peer(Req, Opts) ->
|
||||||
_ ->
|
_ ->
|
||||||
PeerAddr
|
PeerAddr
|
||||||
end,
|
end,
|
||||||
PortHeader = cowboy_req:header(proplists:get_value(proxy_port_header, Opts), Req, <<>>),
|
PortHeader = cowboy_req:header(
|
||||||
|
emqx_utils_maps:deep_get([websocket, proxy_port_header], Opts), Req, <<>>
|
||||||
|
),
|
||||||
ClientPort =
|
ClientPort =
|
||||||
case string:tokens(binary_to_list(PortHeader), ", ") of
|
case string:tokens(binary_to_list(PortHeader), ", ") of
|
||||||
[] ->
|
[] ->
|
||||||
|
|
|
@ -32,7 +32,7 @@ load() ->
|
||||||
disable ->
|
disable ->
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
case feedvar(emqx_ocpp_conf:get_env(json_schema_dir)) of
|
case feedvar(emqx_config:get([gateway, ocpp, json_schema_dir])) of
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
Dir ->
|
Dir ->
|
||||||
|
@ -94,11 +94,11 @@ feedvar(Path) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
schema_id(?OCPP_MSG_TYPE_ID_CALL, Action) when is_binary(Action) ->
|
schema_id(?OCPP_MSG_TYPE_ID_CALL, Action) when is_binary(Action) ->
|
||||||
emqx_ocpp_conf:get_env(json_schema_id_prefix) ++
|
emqx_config:get([gateway, ocpp, json_schema_id_prefix]) ++
|
||||||
binary_to_list(Action) ++
|
binary_to_list(Action) ++
|
||||||
"Request";
|
"Request";
|
||||||
schema_id(?OCPP_MSG_TYPE_ID_CALLRESULT, Action) when is_binary(Action) ->
|
schema_id(?OCPP_MSG_TYPE_ID_CALLRESULT, Action) when is_binary(Action) ->
|
||||||
emqx_ocpp_conf:get_env(json_schema_id_prefix) ++
|
emqx_config:get([gateway, ocpp, json_schema_id_prefix]) ++
|
||||||
binary_to_list(Action) ++
|
binary_to_list(Action) ++
|
||||||
"Response".
|
"Response".
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue