ClientMqtt

  
class ClientMqtt extends ClientProxy {
  constructor(options: MqttClientOptions & { url?: string; serializer?: Serializer<any, any>; deserializer?: Deserializer<any, any>; subscribeOptions?: { qos: QoS; nl?: boolean; rap?: boolean; rh?: number; }; userProperties?: Record<...>; })
  protected logger: Logger
  protected subscriptionsCount: Map<string, number>
  protected url: string
  protected mqttClient: MqttClient | null
  protected connectionPromise: Promise<any> | null
  protected isInitialConnection: false
  protected isReconnecting: false
  protected pendingEventListeners: Array<{...}
  protected options: Required<MqttOptions>['options']
  getRequestPattern(pattern: string): string
  getResponsePattern(pattern: string): string
  close()
  connect(): Promise<any>
  mergeCloseEvent<T = any>(instance: any, source$: Observable<T>): Observable<T>
  createClient(): MqttClient
  registerErrorListener(client: any)
  registerOfflineListener(client: any)
  registerReconnectListener(client: any)
  registerDisconnectListener(client: any)
  registerCloseListener(client: any)
  registerConnectListener(client: any)
  on<EventKey extends keyof MqttEvents = keyof MqttEvents, EventCallback extends MqttEvents[EventKey] = MqttEvents[EventKey]>(event: EventKey, callback: EventCallback)
  unwrap<T>(): T
  createResponseCallback(): (channel: string, buffer: Buffer) => any
  protected publish(partialPacket: ReadPacket<any>, callback: (packet: WritePacket<any>) => any): () => void
  protected dispatchEvent(packet: ReadPacket<any>): Promise<any>
  protected unsubscribeFromChannel(channel: string)
  protected initializeSerializer(options: MqttClientOptions & { url?: string; serializer?: Serializer<any, any>; deserializer?: Deserializer<any, any>; subscribeOptions?: { qos: QoS; nl?: boolean; rap?: boolean; rh?: number; }; userProperties?: Record<...>; })
  protected mergePacketOptions(requestOptions?: MqttRecordOptions): MqttRecordOptions | undefined

  // inherited from nest/packages/microservices/ClientProxy
  protected routingMap: Map<string, Function>
  protected serializer: ProducerSerializer
  protected deserializer: ProducerDeserializer
  protected _status$: ReplaySubject<Status>
  status: Observable<Status>
  abstract connect(): Promise<any>
  abstract close(): any
  on<EventKey extends keyof EventsMap = keyof EventsMap, EventCallback extends EventsMap[EventKey] = EventsMap[EventKey]>(event: EventKey, callback: EventCallback)
  abstract unwrap<T>(): T
  send<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>
  emit<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>
  protected abstract publish(packet: ReadPacket<any>, callback: (packet: WritePacket<any>) => void): () => void
  protected abstract dispatchEvent<T = any>(packet: ReadPacket<any>): Promise<T>
  protected createObserver<T>(observer: Observer<T>): (packet: WritePacket) => void
  protected serializeError(err: any): any
  protected serializeResponse(response: any): any
  protected assignPacketId(packet: ReadPacket<any>): ReadPacket & PacketId
  protected connect$(instance: any, errorEvent: string = 'error', connectEvent: string = 'connect'): Observable<any>
  protected getOptionsProp<Options extends ClientOptions['options'], Attribute extends keyof Options, DefaultValue extends Options[Attribute] = Options[Attribute]>(obj: Options, prop: Attribute, defaultValue: DefaultValue = undefined as DefaultValue)
  protected normalizePattern(pattern: MsPattern): string
  protected initializeSerializer(options: { url?: string; maxSendMessageLength?: number; maxReceiveMessageLength?: number; maxMetadataSize?: number; keepalive?: { keepaliveTimeMs?: number; keepaliveTimeoutMs?: number; keepalivePermitWithoutCalls?: number; http2MaxPingsWithoutData?: number; http2MinTimeBetweenPingsMs?: number; http2MinPingIntervalWithoutData...)
  protected initializeDeserializer(options: { url?: string; maxSendMessageLength?: number; maxReceiveMessageLength?: number; maxMetadataSize?: number; keepalive?: { keepaliveTimeMs?: number; keepaliveTimeoutMs?: number; keepalivePermitWithoutCalls?: number; http2MaxPingsWithoutData?: number; http2MinTimeBetweenPingsMs?: number; http2MinPingIntervalWithoutData...)
}

Constructor


constructor(options: MqttClientOptions & { url?: string; serializer?: Serializer<any, any>; deserializer?: Deserializer<any, any>; subscribeOptions?: { qos: QoS; nl?: boolean; rap?: boolean; rh?: number; }; userProperties?: Record<...>; })

Parameters

Option Type Description
options MqttClientOptions & { url?: string; serializer?: Serializer; deserializer?: Deserializer; subscribeOptions?: { qos: QoS; nl?: boolean; rap?: boolean; rh?: number; }; userProperties?: Record<...>; }

Properties

Property Description
protected logger: Logger Read-only.
protected subscriptionsCount: Map<string, number> Read-only.
protected url: string Read-only.
protected mqttClient: MqttClient | null
protected connectionPromise: Promise<any> | null
protected isInitialConnection: false
protected isReconnecting: false
protected pendingEventListeners: Array<{ event: keyof MqttEvents; callback: MqttEvents[keyof MqttEvents]; }>
protected options: Required<MqttOptions>['options'] Read-only. Declared in constructor.

Methods

getRequestPattern()


getRequestPattern(pattern: string): string

Parameters

Option Type Description
pattern string

Returns

string

getResponsePattern()


getResponsePattern(pattern: string): string

Parameters

Option Type Description
pattern string

Returns

string

close()


close()

Parameters

There are no parameters.

connect()


connect(): Promise<any>

Parameters

There are no parameters.

Returns

Promise<any>

mergeCloseEvent()


mergeCloseEvent<T = any>(instance: any, source$: Observable<T>): Observable<T>

Parameters

Option Type Description
instance any
source$ Observable

Returns

Observable<T>

createClient()


createClient(): MqttClient

Parameters

There are no parameters.

Returns

MqttClient

registerErrorListener()


registerErrorListener(client: any)

Parameters

Option Type Description
client any

registerOfflineListener()


registerOfflineListener(client: any)

Parameters

Option Type Description
client any

registerReconnectListener()


registerReconnectListener(client: any)

Parameters

Option Type Description
client any

registerDisconnectListener()


registerDisconnectListener(client: any)

Parameters

Option Type Description
client any

registerCloseListener()


registerCloseListener(client: any)

Parameters

Option Type Description
client any

registerConnectListener()


registerConnectListener(client: any)

Parameters

Option Type Description
client any

on()


on<EventKey extends keyof MqttEvents = keyof MqttEvents, EventCallback extends MqttEvents[EventKey] = MqttEvents[EventKey]>(event: EventKey, callback: EventCallback)

Parameters

Option Type Description
event EventKey
callback EventCallback

unwrap()


unwrap<T>(): T

Parameters

There are no parameters.

Returns

T

createResponseCallback()


createResponseCallback(): (channel: string, buffer: Buffer) => any

Parameters

There are no parameters.

Returns

(channel: string, buffer: Buffer) => any

publish()


protected publish(partialPacket: ReadPacket<any>, callback: (packet: WritePacket<any>) => any): () => void

Parameters

Option Type Description
partialPacket ReadPacket
callback (packet: WritePacket) => any

Returns

() => void

dispatchEvent()


protected dispatchEvent(packet: ReadPacket<any>): Promise<any>

Parameters

Option Type Description
packet ReadPacket

Returns

Promise<any>

unsubscribeFromChannel()


protected unsubscribeFromChannel(channel: string)

Parameters

Option Type Description
channel string

initializeSerializer()


protected initializeSerializer(options: MqttClientOptions & { url?: string; serializer?: Serializer<any, any>; deserializer?: Deserializer<any, any>; subscribeOptions?: { qos: QoS; nl?: boolean; rap?: boolean; rh?: number; }; userProperties?: Record<...>; })

Parameters

Option Type Description
options MqttClientOptions & { url?: string; serializer?: Serializer; deserializer?: Deserializer; subscribeOptions?: { qos: QoS; nl?: boolean; rap?: boolean; rh?: number; }; userProperties?: Record<...>; }

mergePacketOptions()


protected mergePacketOptions(requestOptions?: MqttRecordOptions): MqttRecordOptions | undefined

Parameters

Option Type Description
requestOptions MqttRecordOptions

Optional. Default is undefined.

Returns

MqttRecordOptions | undefined