const ConsumerLib = require('solclient-message-consumer');
const PublisherLib = require('solclient-message-publisher');
const SMFLib = require('solclient-smf');
const SolclientFactoryLib = require('solclient-factory');
const TransportLib = require('solclient-transport');

const { assert } = require('solclient-eskit');
const { CapabilityType } = require('./capability-types');
const { Check } = require('solclient-validate');
const { CorrelatedRequest } = require('./correlated-request');
const { Destination } = require('solclient-destination');
const { ErrorResponseSubcodeMapper,
        ErrorSubcode,
        OperationError } = require('solclient-error');
const { FsmEvent,
        State,
        StateMachine } = require('solclient-fsm');
const { Hex } = require('solclient-convert');
const { LogFormatter } = require('solclient-log');
const { Message,
        MessageOutcome,
        MessageDeliveryModeType } = require('solclient-message');
const { P2PUtil } = require('./p2p-util');
const { SessionEvent } = require('./session-event');
const { SessionEventCode } = require('./session-event-codes');
const { SessionEventName } = require('./session-event-names');
const { SessionFSMEvent } = require('./session-fsm-event');
const { SessionRequestType } = require('./session-request-types');
const { SessionStateName } = require('./session-state-names');
const { SslDowngrade } = require('./ssl-downgrades');
const { StatType,
        StatsByMode } = require('solclient-stats');
const { StringUtils } = require('solclient-util');
const { SubscriptionUpdateTimeoutMessages } = require('./subscription-update-timeout-messages');

const { formatHexString } = Hex;
const { stripNullTerminate } = StringUtils;
const { STAT_TX_BYMODE_BYTES,
        STAT_TX_BYMODE_MSGS,
        STAT_RX_BYMODE_BYTES,
        STAT_RX_BYMODE_MSGS,
        STAT_TX_BYMODE_REDELIVERED,
        STAT_TX_BYMODE_BYTES_REDELIVERED } = StatsByMode;

/**
 * @param {SessionProperties} sessionProperties The shared reference to the properties of the
 *  owning session
 * @param {Session} session The session owning this FSM
 * @extends StateMachine
 * @private
 */
class SessionFSM extends StateMachine {
  constructor(sessionProperties, session, stats, hosts) {
    super({ name: 'SessionFSM' });
    const fsm = this;
    const logFormatter = function logFormatter(...args) {
      return [
        `[session-fsm=${fsm.sessionIdHex || '(N/A)'}]`,
        `[${fsm.getCurrentStateName()}]`,
        ...args,
      ];
    };
    const logger = this.logger = new LogFormatter(logFormatter);
    const { LOG_TRACE, LOG_DEBUG, LOG_INFO } = logger;
    this.log = logger.wrap(this.log, this);
    this._sessionProperties = sessionProperties;
    this._session = session;
    this._sessionStatistics = stats;
    this._hosts = hosts;
    this._consumers = new ConsumerLib.ConsumerFlows();
    // This is a function that generates this interface. It takes the flow as a construction
    // parameter, and for some methods, produces a partial application using that flow.

    this._flowInterfaceFactory = flow => ({
      getCorrelationTag:               this.getCorrelationTag.bind(this),
      incStat:                         this.incStat.bind(this),
      sendData:                        message => this.send(message, flow, false),
      sendToTransport:                 message => this.sendToTransport(message, flow, false),
      sendControl:                     message => this.send(message, flow, true),
      enqueueRequest:                  this.enqueueOutstandingCorrelatedReq.bind(this),
      createDestinationFromDescriptor: session.createDestinationFromDescriptor.bind(session),
      createTemporaryDestination:      session.createTemporaryDestination.bind(session),
      isCapable:                       session.isCapable.bind(session),
      getCapability:                   session.getCapability.bind(session),
      getCurrentStateName:             this.getCurrentStateName.bind(this),
      updateQueueSubscription:         session.updateQueueSubscription.bind(session),

      get sessionIdHex() {
        return fsm.sessionIdHex;
      },
      get canAck() {
        return session.canAck;
      },
    });

    // Keeping track of transport backpressure (but not AD window exhaustion),
    // so we can relieve it on transport reconnect:
    this._userBackpressured = false;

    this.clearCurrentError();

    this.initial(function onInitial() {
      return this.transitionTo(fsm.SessionDisconnected,
                               context => context.getStateMachine().reset());
    });

    /**
     * Handles events in the unhandledEventHandler callback
     * @param {SessionFSMEvent} sEvent
     * @private
     */
    this.unhandledEventReaction(function onUnhandledEvent(sEvent) {
      const curState = fsm.getCurrentState();
      switch (sEvent.getName()) {
        case SessionEventName.CREATE_SUBSCRIBER:
          // CREATE_SUBSCRIBER is only handled in CONNECTED state.
          // In all other states we just add the consumer to our
          // list of unbound consumers that need to be bound when
          // the session up occurs.
          assert(fsm._consumers, 'collection has lifetime of FSM instance');
          fsm._consumers.add(sEvent.guaranteedFlowObject);
          return this;
        case SessionEventName.DISPOSE:
          LOG_DEBUG('Handling DISPOSE');
          return curState.terminate(() => fsm.disposeInternal());
        case SessionEventName.FLOW_UP:
          LOG_DEBUG(`Ignoring FLOW_UP event from ${sEvent.guaranteedFlowObject}`);
          return this;
        default:
          LOG_TRACE(`Ignoring event ${sEvent.getName()} in state ${fsm.getCurrentStateName()}`);
          return this;
      }
    });

    this.SessionConnecting = new State({
      name:          SessionStateName.CONNECTING,
      parentContext: fsm,
    }, {
      //
      //  handleTransportDestroyed is called in response to TRANSPORT_DESTROYED
      //  event and if transport create throws an exception. In both case there is
      //  no underlying transport and the FSM must move on to the next host.
      handleTransportDestroyed() {
        // clear connectTimer, if it is still running
        fsm.clearConnectTimer();

        fsm._currentHost = fsm._hosts.getNextHost();
        if (fsm._currentHost === null) {
          return this.transitionToExitPoint(fsm.SessionConnecting, 'ErrorExit');
        }

        const { connectWaitTimeInMsecs } = fsm._hosts;
        LOG_TRACE(`Wait time for this host is ${connectWaitTimeInMsecs}`);
        if (connectWaitTimeInMsecs > 0 && !fsm._connectWaitTimer) {
          fsm._connectWaitTimer = setTimeout(() => {
            fsm._connectWaitTimer = null;
            fsm.processEvent(new SessionFSMEvent({ name: SessionEventName.CONNECT_WAIT_TIMEOUT }));
          }, connectWaitTimeInMsecs);
          return this.transitionTo(fsm.WaitingForInterConnectTimeout);
        }
        return this.transitionTo(fsm.WaitingForTransport);
      },
    })
      .entry(() => {
        fsm.setConnectTimer();
      })
      .entryPoint('DisconnectTransport', function onDisconnectTransport() {
        fsm._hosts.reset({ wasConnected: undefined, disconnected: true });
        //
        // Set the error event, as we have reset the
        // host list with disconnected:true, we always take the
        // errorExit path to DISCONNECTED state after this entryPoint.
        //
        fsm._connectFailEvent = SessionEventCode.DISCONNECTED;
        fsm._connectSuccessEvent = SessionEventCode.DISCONNECTED;
        return this.transitionTo(fsm.DestroyingTransport);
      })
      .entryPoint('ReconnectTransport', function onReconnectTransport() {
        //
        // Set the error event, this is only used
        // if we take the errorExit path
        //
        fsm._connectFailEvent = SessionEventCode.DOWN_ERROR;
        fsm._connectSuccessEvent = SessionEventCode.RECONNECTED_NOTICE;
        // Should we try to reconnect?
        const disconnected = fsm._sessionProperties._reconnectRetries === 0;
        fsm._hosts.reset({ wasConnected: true, disconnected });
        if (!disconnected) {
          // workaround to make sure session state is changed to connecting when
          // application gets the session event
          // positional parameters for SessionEvent constructor
          // make sure we get the args and err setup now as by the time
          // the postEventAction runs currentError will be reset.
          const err = fsm._currentError || {};
          const args = [err.eventText,    // infoString
            err.responseCode,             // responseCode
            err.errorSubcode,             // errorSubcode
            undefined,                    // correlationKey ... N/A
            err.eventReason];             // eventReason
          fsm.setPostEventAction(() => {
            fsm.setConnectTimer();
            fsm.emitSessionEvent(SessionEvent.build(SessionEventCode.RECONNECTING_NOTICE, ...args));
          });
        }
        return this.transitionTo(fsm.DestroyingTransport);
      })
      .initial(() => {
        fsm.clearCurrentError();
        //
        // Set the error event this is only used
        // if we take the errorExit path
        //
        fsm._connectFailEvent = SessionEventCode.CONNECT_FAILED_ERROR;
        fsm._connectSuccessEvent = SessionEventCode.UP_NOTICE;

        // Perform host resolution (guaranteed async; replaces a postEventAction)
        // Do not perform host list operations until this completes (moved to callback)
        fsm._hosts.resolveHosts((err) => {
          if (!fsm._hosts) {
            LOG_TRACE('Ignoring host filter fail because session was disposed');
            return null;
          }
          if (err) {
            LOG_TRACE('DNS filter failed:', err);
            fsm.setCurrentError({
              errorSubcode: ErrorSubcode.UNRESOLVED_HOSTS,
              eventText:    err,
            });
            fsm._hosts.reset({ disconnected: true });
            return this.processEvent(new SessionFSMEvent({ name: SessionEventName.EXCEPTION }));
          }

          // Now that the host list has been scanned, we can reset and get next host
          fsm._hosts.reset({ wasConnected: false });
          fsm._currentHost = fsm._hosts.getNextHost();
          return this.processEvent(new SessionFSMEvent(
            { name: SessionEventName.DNS_RESOLUTION_COMPLETE }
          ));
        });
        return this.transitionTo(fsm.WaitingForDNS);
      })
      .reaction(SessionEventName.DNS_RESOLUTION_COMPLETE, function onDNSComplete() {
        return this.transitionTo(fsm.WaitingForTransport);
      })
      .reaction(SessionEventName.DISCONNECT, function onDisconnect(/* event */) {
        return this.transitionToEntryPoint(fsm.SessionConnecting, 'DisconnectTransport');
      })
      .reaction(SessionEventName.CONNECT_TIMEOUT, function onConnectTimeout(/* event */) {
        fsm.setCurrentError({
          errorSubcode: ErrorSubcode.TIMEOUT,
          eventText:    'Connect timeout',
        });
        return this.transitionTo(fsm.DestroyingTransport);
      })
      .reaction(SessionEventName.SEND_ERROR, function onSendError(sessionEvent) {
        LOG_INFO(`SEND_ERROR reached SessionConnecting. ${sessionEvent}`);
        fsm.setCurrentError(sessionEvent);
        return this.transitionTo(fsm.DestroyingTransport);
      })
      .reaction(SessionEventName.EXCEPTION, function onException(errEvent) {
        fsm.setCurrentError(errEvent);
        return this.transitionTo(fsm.DestroyingTransport);
      })
      .reaction(SessionEventName.TRANSPORT_DESTROYED, function onTransportDestroyed(sessionEvent) {
        fsm.setCurrentError(sessionEvent);
        LOG_INFO('TRANSPORT_DESTROYED event');
        return this.transitionTo(fsm.DestroyingTransport);
      })
      .exit(() => {
        fsm.clearConnectTimer();
        if (fsm._connectWaitTimer) {
          clearTimeout(fsm._connectWaitTimer);
          fsm._connectWaitTimer = null;
        }
      })
      .exitPoint('ConnectedExit', (/* event */) => {
        // Normal exit to TransportUp.
        // We need to notify the application they are connected. Use postEventAction to make sure
        // session state has already changed to TransportUp when application gets the callback.
        // Use _connectSuccessEvent to send RECONNECTED vs UP depending on entry point.
        // The text included in the event is currently different for
        // connect vs reconnect. We should store with as part of _connectSuccessEvent in
        // the entry point.
        fsm.setPostEventAction(() => {
          fsm.emitSessionEvent(SessionEvent.build(fsm._connectSuccessEvent,
                                                  `'${fsm._hosts.currentHostToString()}'`,
                                                  200, 0, null, null));
          if (fsm._userBackpressured) {
            const sessionEvent = SessionEvent.build(SessionEventCode.CAN_ACCEPT_DATA,
                                                    '', null, 0, null, '');
            fsm.emitSessionEvent(sessionEvent);
            fsm._userBackpressured = false;
          }
        });
        // Clear current error as we have successfully connected or
        // reconnected.
        fsm.clearCurrentError();
        return this.transitionTo(fsm.SessionTransportUp);
      })
      .exitPoint('ErrorExit', function onExit(/* event */) {
        // Notify the application they are disconnected. Use postEventAction to make sure session
        // state has already changed to disconnected when application gets the callback.
        fsm.setPostEventAction(() => {
          const err = fsm._currentError || {};
          // positional parameters for SessionEvent constructor
          const args = [err.eventText,    // infoString
            err.responseCode,             // responseCode
            err.errorSubcode,             // errorSubcode
            undefined,                    // correlationKey ... N/A
            err.eventReason];             // eventReason
          fsm.emitSessionEvent(SessionEvent.build(fsm._connectFailEvent, ...args));
        });
        return this.transitionTo(fsm.SessionDisconnected);
      });

    this.WaitingForDNS = new State({
      name:          SessionStateName.WAITING_FOR_DNS,
      parentContext: fsm.SessionConnecting,
    });

    this.DestroyingTransport = new State({
      name:          SessionStateName.DESTROYING_TRANSPORT,
      parentContext: fsm.SessionConnecting,
    })
      .entry(() => {
        LOG_INFO('Connecting, disposing transport');
        fsm.clearConnectTimer(); // clear connectTimer, while destroying the transport
        fsm.destroyTransportSession('Disconnecting session', 0);
      })
      .reaction(SessionEventName.TRANSPORT_DESTROYED, function onTransportDestroyed(sessionEvent) {
        fsm.setCurrentError(sessionEvent);
        return fsm.SessionConnecting.handleTransportDestroyed.call(this); // this._transport == null at this point 
      });

    this.WaitingForInterConnectTimeout = new State({
      name:          SessionStateName.WAITING_FOR_INTERCONNECT_TIMEOUT,
      parentContext: fsm.SessionConnecting,
    })
      .reaction(SessionEventName.CONNECT_WAIT_TIMEOUT, function onConnectWaitTimeout(/*event*/) {
        return this.transitionTo(fsm.WaitingForTransport);
      });

    this.WaitingForTransport = new State({
      name:          SessionStateName.WAITING_FOR_TRANSPORT,
      parentContext: fsm.SessionConnecting,
    })
      .initial(() => {
        const errorMessage = 'Cannot establish transport session: creation failed';
        fsm.setConnectTimer();
        fsm.clearCurrentError();
        try {
          fsm.initTransport();
        } catch (e) {
          this.setCurrentError({
            eventText:    e.message === errorMessage ? errorMessage : `${errorMessage}: ${e.message}`,
            errorSubcode: e.subcode || ErrorSubcode.INTERNAL_ERROR,
            eventReason:  e });
          return fsm.SessionConnecting.handleTransportDestroyed.call(this);
        }
        return this.transitionTo(fsm.WaitingForTransportUp);
      });

    this.WaitingForTransportUp = new State({
      name:          SessionStateName.WAITING_FOR_TRANSPORT_UP,
      parentContext: this.WaitingForTransport,
    })
      // This way the code is run only on actual state entry,
      // not on null transitions (staying in this state)
      .entry(() => { this._justEntered = true; })
      .initial(() => {
        if (!this._justEntered) { return this; }
        this._justEntered = false;
        const errorMessage = 'Cannot establish transport session: connection failed';
        try {
          const returnCode = fsm._transport.connect();
          if (returnCode !== TransportLib.TransportReturnCode.OK) {
            throw new OperationError(
              errorMessage,
              ErrorSubcode.CONNECTION_ERROR,
              TransportLib.TransportReturnCode.describe(returnCode)
            );
          }
          LOG_INFO(`Attempting to connect session '${fsm.sessionId}' to ${fsm._hosts.currentHostToString()}`);
        } catch (e) {
          this.setCurrentError({
            eventText:    e.message === errorMessage ? errorMessage : `${errorMessage}: ${e.message}`,
            errorSubcode: e.subcode || ErrorSubcode.INTERNAL_ERROR,
            eventReason:  e });
          return fsm.SessionConnecting.handleTransportDestroyed.call(this);
        }
        return this;
      })
      .reaction(SessionEventName.SEND_ERROR, function doNothing(sEvent) {
        LOG_INFO(`SEND_ERROR while waiting for transport up, doing nothing. ${sEvent}`);
        return this.internalTransition(null);
      })
      .reaction(SessionEventName.TRANSPORT_UP, function onTransportUp(sEvent) {
        fsm.sessionId = sEvent.sessionId || '';
        const returnCode = fsm.sendClientCtrlLogin();
        if (returnCode === TransportLib.TransportReturnCode.OK) {
          return this.transitionTo(fsm.WaitingForLogin);
        }
        const err = {
          eventText:    'Failed to send Client Control Login',
          errorSubcode: ErrorSubcode.LOGIN_FAILURE,
          responseCode: 400,
        };
        fsm.setCurrentError(err);
        return this.transitionTo(fsm.DestroyingTransport);
      });

    this.WaitingForLogin = new State({
      name:          SessionStateName.WAITING_FOR_LOGIN,
      parentContext: fsm.SessionConnecting,
    })
      .entry(() => {
        fsm.setClientCtrlTimer();
      })
      .reaction(SessionEventName.TRANSPORT_PROTOCOL_CLIENTCTRL, function onClientCtrl(sEvent) {
        const clientCtrlMsg = sEvent.smfMsg;
        const response = clientCtrlMsg.getResponse();
        const respCode = response.responseCode;
        const err = {
          responseCode: respCode,
        };
        fsm._responseCode = respCode;
        if (respCode === 200) {
          if (!fsm.checkNoLocal(clientCtrlMsg)) {
            Object.assign(err, {
              eventText:    'No Local is not supported by the Solace Message Router',
              errorSubcode: ErrorSubcode.NO_LOCAL_NOT_SUPPORTED,
            });
          } else if (!fsm.checkCompressedSsl(clientCtrlMsg)) {
            Object.assign(err, {
              eventText:    'Compressed TLS is not supported by the Solace Message Router',
              errorSubcode: ErrorSubcode.COMPRESSED_TLS_NOT_SUPPORTED,
            });
          } else { // Happy path
            fsm.updateReadonlySessionProps(clientCtrlMsg);
            return this.transitionTo(fsm.WaitForTransportChange);
          }
        } else {
          const rawSubcode = ErrorResponseSubcodeMapper.getErrorSubcode(respCode,
                                                                        response.responseString);
          const subcode = (rawSubcode === ErrorSubcode.UNKNOWN_ERROR) ?
            ErrorSubcode.LOGIN_FAILURE : rawSubcode;
          LOG_INFO(`Login failed. Subcode: ${subcode} respCode: ${respCode} ` +
            `respString: ${response.responseString}`);
          Object.assign(err, {
            eventText:    response.responseString,
            errorSubcode: subcode,
          });
        }
        LOG_TRACE('ClientCtrl error', err);
        fsm.setCurrentError(err);
        return this.transitionTo(fsm.DestroyingTransport);
      })
      .reaction(SessionEventName.DOWNGRADE_TIMEOUT, function onDowngradeTimeout(/* event */) {
        if (!fsm._transport.requestDowngrade || fsm._transport.requestDowngrade('ClientCtrl timeout', ErrorSubcode.TIMEOUT) === false) {
          fsm.setCurrentError({
            eventText:    'ClientCtrl timeout',
            errorSubcode: ErrorSubcode.TIMEOUT,
          });
          return this.transitionTo(fsm.DestroyingTransport);
        }
        // Don't recreate the transport, just try to connect it
        LOG_TRACE('Downgrade accepted, returning to WaitingForTransportUp');
        return this.transitionTo(fsm.WaitingForTransportUp);
      })
      .exit(() => {
        fsm.clearClientCtrlTimer();
      });

    // This is an intermediate state after a successful login.
    // It gives time to the transport factory (more like a dealership at this point)
    // to enable compression and/or shut down tls.
    // The factory may return a new transport immediately,
    // in which case there is no need to stay in this state.
    // Or it may return null, and pass the new transport to the callback later.
    // No messages can be sent while waiting for the callback.
    this.WaitForTransportChange = new State({
      name:          SessionStateName.WAITING_FOR_TRANSPORT_CHANGE,
      parentContext: fsm.SessionConnecting,
    })
      .initial(function onInitial() {
        // Operates on the FSM, must be bound to it.
        const callback = function onTransportChange(newTransport) {
          this._transport = newTransport;
          this.processEvent(new SessionFSMEvent(
                            { name: SessionEventName.TRANSPORT_CHANGE_DONE }
          ));
        };
        const rc = fsm.updateTransportCompression(callback.bind(fsm));
        if (rc === null) {
          // Factory needs time, stay in this state.
          return this;
        }
        fsm._transport = rc;
        return this.transitionTo(fsm.ReapplyingSubscriptions);
      })
      .reaction(SessionEventName.TRANSPORT_CHANGE_DONE, function onTransportChangeDone() {
        return this.transitionTo(fsm.ReapplyingSubscriptions);
      });

    this.ReapplyingSubscriptions = new State({
      name:          SessionStateName.REAPPLYING_SUBSCRIPTIONS,
      parentContext: fsm.SessionConnecting,
    })
      .entry(() => {
        LOG_INFO('ReapplyingSubscriptions: entry');
        fsm.copySubscriptionCacheKeys();
      })
      .initial(function onInitial() {
        if (fsm.reapplySubscriptions() === true) {
          // all subscriptions applied and waiting for a subscription confirm
          return this.transitionTo(fsm.WaitForSubConfirm);
        }
        // flow controlled while reapplying
        return this.transitionTo(fsm.WaitForCanAcceptData);
      })
      .reaction(SessionEventName.SUBSCRIBE_TIMEOUT, function onSubscribeTimeout() {
        const respText = 'Subscription timeout while reapplying';
        const subcode = ErrorSubcode.TIMEOUT;
        fsm.setCurrentError({
          eventText:    respText,
          errorSubcode: subcode,
        });
        return this.transitionTo(fsm.DestroyingTransport);
      })
      .exit(function onExit() {
        fsm.clearSubscriptionCacheKeys();
        return this;
      });

    this.WaitForSubConfirm = new State({
      name:          SessionStateName.WAITING_FOR_SUBCONFIRM,
      parentContext: fsm.ReapplyingSubscriptions,
    })
      .reaction(SessionEventName.TRANSPORT_PROTOCOL_SMP, function onTransportProtocolSMP(sEvent) {
        // process rxMsgObj
        const smfRespHeader = sEvent.smfMsg.smfHeader;
        const subscriptionStr = stripNullTerminate(sEvent.smfMsg.encodedUtf8Subscription);
        const respCode = smfRespHeader.pm_respcode;
        const respText = smfRespHeader.pm_respstr;
        if (respCode !== 200) {
          const errorSubcode = ErrorResponseSubcodeMapper.getErrorSubcode(respCode, respText);
          LOG_INFO(`Waiting for subscription confirmation, got ${respCode} (${errorSubcode}) ` +
                    `'${respText}' on subscription ${subscriptionStr}`);
          fsm.setCurrentError({
            eventText:    respText,
            responseCode: respCode,
            errorSubcode,
          });
          return this.transitionTo(fsm.DestroyTransport);
        }

        if (fsm._session.canConnectPublisher &&
            fsm._defaultPublisher &&
            fsm._defaultPublisher.isBindWaiting()) {
          return this.transitionTo(fsm.WaitingForMessagePublisher);
        }
        return this.transitionToExitPoint(fsm.SessionConnecting, 'ConnectedExit');
      });

    this.WaitForCanAcceptData = new State({
      name:          SessionStateName.WAITING_FOR_CAN_ACCEPT_DATA,
      parentContext: fsm.ReapplyingSubscriptions,
    })
      .reaction(SessionEventName.TRANSPORT_PROTOCOL_SMP, function onTransportProtocolSMP(sEvent) {
        // process rxMsgObj
        const smfRespHeader = sEvent.smfMsg.smfHeader;
        const subscriptionStr = stripNullTerminate(sEvent.smfMsg.encodedUtf8Subscription);
        const respCode = smfRespHeader.pm_respcode;
        const respText = smfRespHeader.pm_respstr;
        // We are not expecting a SMP response as we have not sent a request-confirm
        // so this is almost assuredly an error response to a subscription add
        // check respCode anyway.  if its '200 OK' just ignore it.
        if (respCode !== 200) {
          const errorSubcode = ErrorResponseSubcodeMapper.getErrorSubcode(respCode, respText);
          LOG_DEBUG(`Waiting for can accept data, got ${respCode} (subcode ${errorSubcode}) ` +
            `'${respText}' on subscription ${subscriptionStr}`);
          fsm.setCurrentError({
            eventText:    respText,
            responseCode: respCode,
            errorSubcode,
          });
          return this.transitionTo(fsm.DestroyingTransport);
        }
        // unexpected 200 OK repsonse, log it and continue
        LOG_INFO(`Unexpected 200 OK response to subscription add for ${subscriptionStr}`);
        return this.internalTransition(null);
      })
      .reaction(SessionEventName.TRANSPORT_CAN_ACCEPT_DATA, function onTransportCanAcceptData() {
        if (fsm.reapplySubscriptions() === true) {
          // all subscriptions applied and waiting for a subscription confirm
          return this.transitionTo(fsm.WaitForSubConfirm);
        }
        // flow controlled while reapplying
        return this.internalTransition(null);
      });

    this.WaitingForMessagePublisher = new State({
      name:          SessionStateName.WAITING_FOR_PUBFLOW,
      parentContext: fsm.SessionConnecting,
    })
      .entry(() => {
        // Inform the publisher that the session is up and the flow establishment can begin.
        fsm._defaultPublisher.connect();
        fsm.sendPublisherSessionUpEvent(fsm._defaultPublisher);
      })
      .reaction(SessionEventName.FLOW_UP, function onEvent() {
        LOG_TRACE(`Flow up ${this}`);
        if (!fsm._defaultPublisher.isBindWaiting()) {
          LOG_TRACE('Publisher up');
          return this.transitionToExitPoint(fsm.SessionConnecting, 'ConnectedExit');
        }
        LOG_TRACE('Waiting for more publishers');
        return this.internalTransition(null);
      })
      .reaction(SessionEventName.FLOW_FAILED, function onEvent(sEvent) {
        fsm.setCurrentError({
          eventText:    `Guaranteed Message Publisher Failed: ${sEvent.eventText}`,
          errorSubcode: ErrorSubcode.LOGIN_FAILURE,
        });
        return this.transitionTo(fsm.DestroyingTransport);
      });

    this.SessionTransportUp = new State({
      name:          SessionStateName.TRANSPORT_UP,
      parentContext: fsm,
    })
      .entry(function onEntry() {
        fsm.clearConnectTimer(); // clear connectTimer, once we successfully connect
        return this;
      })
      .initial(function onInitial() {
        if (fsm._session.canConnectConsumer) {
          // Inform the subscribers that the session is up and the flow establishment can begin.
          fsm._consumers.flows.forEach(consumer => fsm.sendConsumerSessionUpEvent(consumer));
          // Reconnecting flows need session state notification too.
          fsm._consumers.reconnectingFlows.forEach(
            consumer => fsm.sendConsumerSessionUpEvent(consumer));
        }
        return this.transitionTo(fsm.FullyConnected);
      })
      .reaction(SessionEventName.DISCONNECT, function onDisconnect(/* sEvent */) {
        return this.transitionTo(fsm.SessionDisconnecting);
      })
      .reaction(SessionEventName.EXCEPTION, function onException(sEvent) {
        fsm.setCurrentError(sEvent);
        fsm.cleanupSession();
        return this.transitionToEntryPoint(fsm.SessionConnecting, 'ReconnectTransport');
      })
      .reaction(SessionEventName.SEND_ERROR, function onSendError(sEvent) {
        fsm.setCurrentError(sEvent);
        fsm.cleanupSession();
        LOG_TRACE('Reconnecting transport after SEND_ERROR (not fully connected)');
        return this.transitionToEntryPoint(fsm.SessionConnecting, 'ReconnectTransport');
      })
      .reaction(SessionEventName.TRANSPORT_DESTROYED, function onTransportDestroyed(sEvent) {
        fsm.setCurrentError(sEvent);
        LOG_INFO('Received unsolicited TRANSPORT_DESTROYED event while transport is up');
        LOG_TRACE(`TRANSPORT_DESTROYED details: ${sEvent}`);
        fsm.cleanupSession();
        return this.transitionToEntryPoint(fsm.SessionConnecting, 'ReconnectTransport');
      })
      .reaction(SessionEventName.TRANSPORT_PROTOCOL_SMP, function onTransportProtocolSMP(sEvent) {
        const smfRespHeader = sEvent.smfMsg.smfHeader;
        const subscriptionStr = stripNullTerminate(sEvent.smfMsg.encodedUtf8Subscription);
        const respCode = smfRespHeader.pm_respcode;
        const respText = smfRespHeader.pm_respstr;

        fsm.handleSubscriptionUpdateError(respCode, respText, subscriptionStr, undefined, false);
        return this.internalTransition(null);
      })
      .reaction(SessionEventName.TRANSPORT_CAN_ACCEPT_DATA, function onCanAcceptData(sEvent) {
        // Notify client for republishing
        const sessionEvent = SessionEvent.build(SessionEventCode.CAN_ACCEPT_DATA,
                                                '', null, 0, null, sEvent.toString());
        fsm.emitSessionEvent(sessionEvent);
        fsm._userBackpressured = false;
        return this.internalTransition(null);
      })
      .reaction(SessionEventName.CREATE_SUBSCRIBER, function onCreateSubscriber(sEvent) {
        // Inform the consumer that the session is up and the flow establishment can begin.
        const consumer = fsm._consumers.add(sEvent.guaranteedFlowObject);
        fsm.sendConsumerSessionUpEvent(consumer);
        return this;
      })
      .exit(function onExit() {
        fsm.clearKeepAlive();
        return this;
      });

    this.FullyConnected = new State({
      name:          SessionStateName.FULLY_CONNECTED,
      parentContext: fsm.SessionTransportUp,
    })
      .entry(() => {
        fsm._connectFailEvent = SessionEventCode.DOWN_ERROR;
        fsm.scheduleKeepAlive();
      });

    this.SessionDisconnected = new State({
      name:          SessionStateName.DISCONNECTED,
      parentContext: fsm,
    })
      .reaction(SessionEventName.DISCONNECT, function onDisconnect(/* sEvent */) {
        fsm.setPostEventAction(() => {
          fsm.emitSessionEvent(SessionEvent.build(SessionEventCode.DISCONNECTED));
        });

        return this.internalTransition(null);
      })
      .reaction(SessionEventName.CONNECT, function onConnect(/* sEvent */) {
        return this.transitionTo(fsm.SessionConnecting);
      })
      .reaction(SessionEventName.EXCEPTION, function onException(/* sEvent */) {
        // do nothing
        return this.internalTransition(null);
      });
    this.SessionDisconnecting = new State({
      name:          SessionStateName.DISCONNECTING,
      parentContext: fsm,
    })
      .initial(() => {
        LOG_INFO(`Disconnecting session ${fsm}`);
        return this.transitionTo(fsm.DisconnectingFlows);
      })
      .reaction(SessionEventName.DISCONNECT, function onDisconnect(/* sEvent */) {
        return this.internalTransition(null);
      })
      .reaction(SessionEventName.EXCEPTION, function onException(errEvent) {
        //
        // As we are in disconnecting state we have initiated the
        // disconnect and need to preserve the error subcode that
        // we set before destroying the transport. The transport, having
        // no error, will usually return a subcode of zero here.
        //
        // This only changes the eventCode, not the subcode. Other properties
        // are preserved.
        //
        fsm.setCurrentError({ errEvent });
        fsm.cleanupSession();
        return this.transitionToEntryPoint(fsm.SessionConnecting, 'DisconnectTransport');
      })
      .reaction(SessionEventName.TRANSPORT_DESTROYED, function onTransportDestroyed(/* sEvent */) {
        LOG_INFO('Received unsolicited TRANSPORT_DESTROYED while disconnecting transport');
        fsm.cleanupSession();
        return this.transitionToEntryPoint(fsm.SessionConnecting, 'DisconnectTransport');
      });

    this.DisconnectingFlows = new State({
      name:          SessionStateName.DISCONNECTING_FLOWS,
      parentContext: fsm.SessionDisconnecting,
    }, {
      // Get all flows to disconnect. This will need to be called at least once.
      gatherPendingFlows() {
        const { MessageConsumerEventName } = ConsumerLib;

        // Don't re-enter synchronously to look for new flows.
        // Flows disconnect synchronously when they are already disconnected.
        // If this is the case, they throw; they do not emit an event.
        // If a consumer flow is up, it will immediately emit a BIND_WAITING event and we
        // handle that.
        // Since we know that flow#_disconnectSession neither creates a new flow
        // nor calls a user callback that could do so, we need not look for new
        // flows that were created.
        assert(!this.isGathering);
        this.isGathering = true;

        const adaptedListenForDestroy = (flow, installfn, downEvents) => {
          assert(flow, 'Trying to listen to undefined flow');
          // If we already know about this flow, skip it.
          if (this.known.has(flow)) return;
          LOG_TRACE(`Adding pending flow ${flow}`);
          this.known.add(flow);
          this.pending.add(flow);
          const onFlowDown = () => {
            LOG_TRACE(`Removing flow from pending ${flow}`);
            downEvents.forEach(event => flow._removeListener(event, onFlowDown));
            this.pending.delete(flow);
            if (!this.isGathering) this.checkPendingFlows();
          };
          downEvents.forEach(event => installfn.call(flow, event, onFlowDown));
          try {
            flow._disconnectSession(); // Session has been disconnected by user
          } catch (ex) {
             // Synchronously down
            LOG_TRACE('Flow disconnect threw');
            LOG_TRACE(ex);
            onFlowDown();
          }
        };

        // See SOL-13556: sending CloseFlow makes it impossible to recover the flow.
        // This was the code that sent CloseFlow. I'm leaving it here temporarily
        // in case we want a clean shutdown that waits for acks.
        /*
        const { MessagePublisherEventName } = PublisherLib;
        if (fsm._defaultPublisher) {
          adaptedListenForDestroy(fsm._defaultPublisher,
                                  fsm._defaultPublisher.once,
                                  [MessagePublisherEventName.DOWN]);
        }
        */
        if (fsm._consumers) {
          fsm._consumers.flows.forEach((flow) => {
            adaptedListenForDestroy(flow,
                                    flow._once,
                                    [
                                      MessageConsumerEventName.DOWN,
                                      MessageConsumerEventName.DOWN_ERROR,
                                    ]);
          });
        }

        this.isGathering = false;
      },
      // This function is called whenever a flow comes down, whether sync or async.
      // The body is wrapped in a basic debounce: if we are re-entering (synchronously),
      // no new flows will be added.
      checkPendingFlows() {
        LOG_TRACE('Waiting for disconnects on', this.pending);
        if (this.pending.size === 0) {
          // We are out of pending flows, but look for new flows that were just added
          this.gatherPendingFlows();
          // Were any flows just added?
          if (this.pending.size === 0) {
            this.proceed();
          }
        }
      },
      proceed() {
        // All done!
        // Clear flow sets so they can be disposed
        this.known = null;
        this.pending = null;
        LOG_INFO('All flows disconnected');
        fsm.processEvent(new FsmEvent({ name: SessionEventName.FLOWS_DISCONNECTED }));
      },
    })
      .entry(function onEntry() {
        this.known = new Set();
        this.pending = new Set();
        this.checkPendingFlows();
      })
      .reaction(SessionEventName.FLOWS_DISCONNECTED, function onFlowsDisconnected() {
        return this.transitionTo(fsm.FlushingTransport);
      });

    this.FlushingTransport = new State({
      name:          SessionStateName.FLUSHING_TRANSPORT,
      parentContext: fsm.SessionDisconnecting,
    }, {
      flushTransport() {
        fsm.cleanupSession();
        fsm.flushTransportSession(() => this.onTransportFlushed());
        this.sessionId = null;
      },
      onTransportFlushed() {
        fsm.processEvent(new FsmEvent({ name: SessionEventName.TRANSPORT_FLUSHED }));
      },
    })
      .entry(function onEntry() {
        LOG_INFO('Flushing transport');
        this.flushTransport();
      })
      .reaction(SessionEventName.TRANSPORT_FLUSHED, function onTransportFlushed() {
        LOG_INFO('Handle Transport Flushed');
        return this.transitionToEntryPoint(fsm.SessionConnecting, 'DisconnectTransport');
      });
  }

  /**
   * @param {Destination} destination The topic to add
   * @private
   */
  addToSubscriptionCache(destination) {
    if (Check.nothing(destination) || !this._subscriptionCache) {
      return;
    }

    const { LOG_DEBUG } = this.logger;
    const key = destination.name;
    if (this._subscriptionCache[key] === null ||
        this._subscriptionCache[key] === undefined) {
      LOG_DEBUG(`Cache subscription ${key}`);
      this._subscriptionCache[key] = destination;
      LOG_DEBUG('Increment cache count');
      this._subscriptionCacheCount++;
    } else {
      LOG_DEBUG(`Cache subscription ${key}`);
      this._subscriptionCache[key] = destination;
    }
  }

  /**
   * @param {String} correlationTag The tag of the request to cancel
   * @returns {CorrelatedRequest} The cancelled request
   * @private
   */
  cancelOutstandingCorrelatedReq(correlationTag) {
    if (Check.nothing(correlationTag) || !this._correlatedReqs) {
      return null;
    }
    const req = this._correlatedReqs[correlationTag];
    if (req === null || req === undefined) {
      return null;
    }

    const { LOG_DEBUG, LOG_ERROR } = this.logger;
    LOG_DEBUG(`Cancel outstanding ctrl request correlationTag=${correlationTag
      }`);
    if (req.timer) {
      clearTimeout(req.timer);
      req.timer = null;
    }
    try {
      const result = delete this._correlatedReqs[correlationTag];
      if (!result) {
        LOG_ERROR(`Cannot delete ctrl request ${correlationTag}`);
      }
    } catch (e) {
      LOG_ERROR(`Cannot delete ctrl request ${correlationTag}`, e);
    }
    return req;
  }


  /**
   * @private
   */
  cleanupSession() {
    const { LOG_INFO } = this.logger;
    LOG_INFO('Clean up session');

    const {
      ConsumerFSMEvent,
      ConsumerFSMEventNames,
    } = ConsumerLib;

    if (this._correlatedReqs) {
      Object.keys(this._correlatedReqs).forEach(key =>
        this.cancelOutstandingCorrelatedReq(key));
    }

    this.clearConnectTimer();
    this.clearClientCtrlTimer();
    this.clearKeepAlive();
    this._consumers.flows.forEach((consumer) => {
      consumer.processFSMEvent(
        new ConsumerFSMEvent({ name: ConsumerFSMEventNames.SESSION_DOWN })
      );
    });
    // Reconnecting flows need session state notification too.
    this._consumers.reconnectingFlows.forEach((consumer) => {
      consumer.processFSMEvent(
        new ConsumerFSMEvent({ name: ConsumerFSMEventNames.SESSION_DOWN })
      );
    });
    if (this._defaultPublisher) {
      this._defaultPublisher.processFSMEvent(
        new PublisherLib.PublisherFSMEvent({
          name: PublisherLib.PublisherFSMEventNames.SESSION_DOWN,
        })
      );
    }
    this._session.cleanupSession();
  }

  /**
   * @private
   */
  clearClientCtrlTimer() {
    if (!this._clientCtrlTimer) {
      return;
    }

    clearTimeout(this._clientCtrlTimer);
    this._clientCtrlTimer = null;
  }


  /**
   * @private
   */
  clearConnectTimer() {
    if (!this._connectTimer) {
      return;
    }

    clearTimeout(this._connectTimer);
    this._connectTimer = undefined;
  }

  /**
   * Cancel keep alive task
   * @private
   */
  clearKeepAlive() {
    const { LOG_INFO } = this.logger;

    if (this._keepAliveTimer) {
      LOG_INFO('Cancel keepalive timer');
      clearInterval(this._keepAliveTimer);
      this._keepAliveTimer = null;
    }

    this.resetKeepAliveCounter();
  }


  /**
   * @param {ClientCtrlMessage} clientCtrlMsg The message to parse
   * @returns {Boolean} true if No Local is supported by the router
   * @private
   */
  checkNoLocal(clientCtrlMsg) {
    let noLocalSupported = true;
    if (this._sessionProperties.noLocal === true) {
      const caps = clientCtrlMsg.getRouterCapabilities();
      if (!caps) {
        noLocalSupported = false;
      } else {
        // Guard for undefined OR non-boolean capability
        noLocalSupported = (typeof caps[CapabilityType.NO_LOCAL] === 'boolean') ? caps[CapabilityType.NO_LOCAL] : false;
      }
    }
    return noLocalSupported;
  }

  /**
   * Check against an odd router version which OKs logins with TLS downgrade to compression,
   * but does not actually support it, and just downgrades to plain text instead.
   * @param {ClientCtrlMessage} clientCtrlMsg The message to parse
   * @returns {Boolean} false if router should have rejected login for unsupported compressed TLS.
   * @private
   */
  checkCompressedSsl(clientCtrlMsg) {
    const { LOG_TRACE } = this.logger;
    if (this._compressedTLS) {
      const caps = clientCtrlMsg.getRouterCapabilities();
      if (!caps || typeof caps[CapabilityType.COMPRESSED_SSL] !== 'boolean') {
        LOG_TRACE('Compressed SSL capability missing.');
        return false;
      }
      return caps[CapabilityType.COMPRESSED_SSL] === true;
    }
    return true;
  }

  /**
   * Check destination against router capablilities
   * @param {Destination} destination to verify
   * @returns {?solace.OperationError} error for caller to throw otherwise null
   * @private
   */
  checkSessionDestinationCapability(destination) {
    let error = null;
    if (destination && destination.getType()) {
      if (destination.getSubscriptionInfo()
          && (destination.getSubscriptionInfo().isShare
              || destination.getSubscriptionInfo().isNoExport)
          && !this._session.isCapable(CapabilityType.SHARED_SUBSCRIPTIONS)) {
        error = new OperationError(
          'Shared subscriptions are not allowed by router for this client',
          ErrorSubcode.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED,
          null);
      }
    }
    return error;
  }

  /**
   * Clears the current error exit information for the FSM.
   */
  clearCurrentError() {
    this._currentError = null;
  }

  /**
   * @private
   */
  clearSubscriptionCacheKeys() {
    this._subscriptionCacheKeys = null;
  }

  /**
   * @private
   */
  copySubscriptionCacheKeys() {
    // reapply subscriptions if applicable
    this.clearSubscriptionCacheKeys();
    this._subscriptionCacheKeys = Object.keys(this._subscriptionCache || {});

    // Add the P2P Inbox subscription, so the subscriptionCache always has
    // at least one subscription.
    const p2pTopic = P2PUtil.getP2PTopicSubscription(this._sessionProperties.p2pInboxBase);
    this._subscriptionCacheKeys.push(p2pTopic);
  }

  createMessagePublisher() {
    const { LOG_DEBUG } = this.logger;
    if (!this._sessionProperties.publisherProperties.enabled) {
      LOG_DEBUG('Publisher is disabled');
      return;
    }

    const {
      MessagePublisher,
      MessagePublisherEventName,
    } = PublisherLib;
    const publisher = new MessagePublisher({
      properties:              this._sessionProperties.publisherProperties,
      sessionInterfaceFactory: this._flowInterfaceFactory,
    });
    publisher.on(
      MessagePublisherEventName.UP,
      () => this.processEvent(new SessionFSMEvent(
        { name: SessionEventName.FLOW_UP },
        { guaranteedFlowObject: publisher })));
    publisher.on(
      MessagePublisherEventName.CONNECT_FAILED_ERROR,
      event => this.processEvent(
        new SessionFSMEvent(
          { name: SessionEventName.FLOW_FAILED },
          {
            guaranteedFlowObject: publisher,
            event,
            eventText:            event.description,
          })));

    publisher.on(MessagePublisherEventName.REJECTED_MESSAGE, (message, ctrlMessage) => {
      const header = ctrlMessage.smfHeader;
      const respCode = header.pm_respcode;
      const respText = header.pm_respstr;
      const errorSubcode = ErrorResponseSubcodeMapper.getADErrorSubcode(respCode, respText);
      const event = SessionEvent.build(SessionEventCode.REJECTED_MESSAGE_ERROR,
                                       respText,
                                       respCode,
                                       errorSubcode,
                                       message.getCorrelationKey());
      event['message'] = message; // eslint-disable-line dot-notation
      this.emitSessionEvent(event);
    });
    publisher.on(MessagePublisherEventName.ACKNOWLEDGED_MESSAGE, (message) => {
      const event = SessionEvent.build(SessionEventCode.ACKNOWLEDGED_MESSAGE,
                                       'Message(s) acknowledged',
                                       undefined,
                                       0,
                                       message.getCorrelationKey());
      event['message'] = message; // eslint-disable-line dot-notation
      this.emitSessionEvent(event);
    });
    publisher.on(MessagePublisherEventName.FLOW_NAME_CHANGED, (data) => {
      const { messages, count } = data;
      if (count > 0) {
        const event = SessionEvent.build(SessionEventCode.REPUBLISHING_UNACKED_MESSAGES,
                                         `Republishing ${count} messages due to ` +
          'Guaranteed Message Publisher failed to reconnect');
        event['messages'] = messages; // eslint-disable-line dot-notation
        event['count'] = count; // eslint-disable-line dot-notation
        this.emitSessionEvent(event);
      }
    });
    publisher.on(MessagePublisherEventName.CAN_SEND, () => {
      this.emitSessionEvent(
        SessionEvent.build(SessionEventCode.CAN_ACCEPT_DATA,
                           `${publisher} window is now open and can send`));
    });
    publisher.on(MessagePublisherEventName.GUARANTEED_MESSAGING_DOWN, () => {
      this.emitSessionEvent(
        SessionEvent.build(
          SessionEventCode.GUARANTEED_MESSAGE_PUBLISHER_DOWN,
          'Guaranteed Message Publishing shut down'));
    });
    this._defaultPublisher = publisher;
  }

  sendConsumerSessionUpEvent(consumer) {
    const {
      ConsumerFSMEvent,
      ConsumerFSMEventNames,
    } = ConsumerLib;
    const isAD = this._session.canConnectConsumer;
    const event = new ConsumerFSMEvent({
      name: isAD
        ? ConsumerFSMEventNames.SESSION_UP
        : ConsumerFSMEventNames.SESSION_UP_NO_AD,
    });
    event.guaranteedFlowObject = consumer;
    consumer.processFSMEvent(event);
  }

  sendPublisherSessionUpEvent(publisher) {
    const isAD = this._session.canConnectPublisher;
    const event = new PublisherLib.PublisherFSMEvent({
      name: isAD
        ? PublisherLib.PublisherFSMEventNames.SESSION_UP
        : PublisherLib.PublisherFSMEventNames.SESSION_UP_NO_AD,
    });
    event.guaranteedFlowObject = publisher;
    publisher.processFSMEvent(event);
  }

  /**
   * Creates a Guaranteed Messaging Subscriber.
   *
   * @param {Object|MessageConsumerProperties} properties Properties for the flow.
   *  Expected fields: {@link solace.MessageConsumerProperties}
   * @returns {solace.MessageConsumer} The newly constructed consumer
   * @private
   */
  createMessageConsumer(properties) {
    const { MessageConsumer } = ConsumerLib;
    const consumer = new MessageConsumer({
      properties,
      sessionInterfaceFactory: this._flowInterfaceFactory,
    });
    // use the consumer properties instead of properties to ensure the topicEndpointSubscription
    // has a Topic object
    const consumerProperties = consumer.getProperties();
    if (consumerProperties.topicEndpointSubscription) {
      //check topic supported
      const error =
          this.checkSessionDestinationCapability(consumerProperties.topicEndpointSubscription);
      if (error) {
        throw error;
      }
    }

    const { LOG_WARN } = this.logger;

    // check for whether the broker session supports the capabilities
    const settlementOutcomes = consumerProperties.requiredSettlementOutcomes;
    if (settlementOutcomes && settlementOutcomes.length > 0) {
      // check the capability bit for settlement outcome (NACK) support or throw OperationError()
      const isNackRequired = settlementOutcomes.some(v => v === MessageOutcome.FAILED || MessageOutcome.REJECTED);
      if(isNackRequired && !this._session.isCapable(CapabilityType.AD_APP_ACK_FAILED)) {
        const outcomeNotSupportedError = `Session.capabilitySettlementOutcomeNotSupported: [ ${
          settlementOutcomes.map(v => `solace.MessageOutcome.${MessageOutcome.nameOf(v)}`).join(', ')
        } ]`;
        // log the operation error as a Warning
        LOG_WARN(outcomeNotSupportedError);
        // then throw the error here
        throw new OperationError(outcomeNotSupportedError, ErrorSubcode.INVALID_OPERATION, null);
      }
    }
    const info = { guaranteedFlowObject: consumer };
    // Will be added to the guaranteed flow collection by dispatching
    this.processEvent(
      new SessionFSMEvent({ name: SessionEventName.CREATE_SUBSCRIBER }, info)
    );
    return consumer;
  }

/**
   * Creates a Queue Browser.
   *
   * @param {Object|QueueBrowserProperties} properties Properties for the queue browser.
   *  Expected fields: {@link solace.QueueBrowserProperties}
   * @returns {solace.QueueBrowser} The newly constructed queue browser
   * @private
   */
  createQueueBrowser(properties) {
    const { MessageConsumerAcknowledgeMode, QueueBrowser } = ConsumerLib;
    const { LOG_DEBUG } = this.logger;

    LOG_DEBUG(`Creating queue browser with properties:  ${properties}`);

    const consumerProperties = {};
    consumerProperties.queueDescriptor = properties.queueDescriptor;
    consumerProperties.acknowledgeMode = MessageConsumerAcknowledgeMode.CLIENT;
    consumerProperties.browser = true;

    // Optional properties
    if (Object.prototype.hasOwnProperty.call(properties, 'connectTimeoutInMsecs')) {
      consumerProperties.connectTimeoutInMsecs = properties.connectTimeoutInMsecs;
    }
    if (Object.prototype.hasOwnProperty.call(properties, 'connectAttempts')) {
      consumerProperties.connectAttempts = properties.connectAttempts;
    }
    if (Object.prototype.hasOwnProperty.call(properties, 'windowSize')) {
      consumerProperties.windowSize = properties.windowSize;
    }
    if (Object.prototype.hasOwnProperty.call(properties, 'transportAcknowledgeTimeoutInMsecs')) {
      consumerProperties.transportAcknowledgeTimeoutInMsecs =
        properties.transportAcknowledgeTimeoutInMsecs;
    }
    if (Object.prototype.hasOwnProperty.call(properties, 'transportAcknowledgeThresholdPercentage')) {
      consumerProperties.transportAcknowledgeThresholdPercentage =
        properties.transportAcknowledgeThresholdPercentage;
    }

    const consumer = this.createMessageConsumer(consumerProperties);
    const browser = new QueueBrowser(consumer);
    return browser;
  }

  /**
   * @param {String} msg The status message for the operation
   * @param {Subcode} subcode The subcode reason for the operation
   * @private
   */
  destroyTransportSession(msg, subcode) {
    if (Check.nothing(this._transport)) {
      // Just send the event
      this.processEvent(
        new SessionFSMEvent({ name: SessionEventName.TRANSPORT_DESTROYED })
      );
      return;
    }
    const { LOG_INFO, LOG_ERROR } = this.logger;
    LOG_INFO('Destroy transport session');

    const returnCode = this._transport.destroy(msg, subcode);
    this._smfClient = null;

    if (returnCode !== TransportLib.TransportReturnCode.OK) {
      LOG_ERROR(`Failed to destroy transport session, return code: ${
        TransportLib.TransportReturnCode.describe(returnCode)}`);
    }
  }


  /**
   * Release all resources associated with the session.
   * @private
   */
  disposeInternal() {
    if (this._disposed) {
      return;
    }

    const operations = {
      'transport': () => {
        this.destroyTransportSession('Disposing', 0);
        this._transport = null;
        this._smfClient = null;
      },
      'session': () => {
        this.cleanupSession();
        this._session = null;
        this._sessionProperties = null;
        this._correlatedReqs = null;
        this._flowInterfaceFactory = null;
      },
      'statistics': () => {
        if (this._sessionStatistics) {
          this._sessionStatistics.resetStats();
          this._sessionStatistics = null;
        }
        this._kaStats = null;
      },
      'subscription cache': () => {
        if (this._subscriptionCache) {
          Object.keys(this._subscriptionCache).forEach(
            key => this.removeFromSubscriptionCache(key)
          );
          this._subscriptionCache = null;
        }
        this.clearSubscriptionCacheKeys();
        this._subscriptionCacheCount = 0;
      },
      'MessagePublishers': () => {
        if (this._defaultPublisher) {
          this._defaultPublisher.dispose();
          this._defaultPublisher = null;
        }
      },
      'MessageConsumers': () => {
        this._consumers.disposeAll();
        this._consumers = null;
      },
      'host list': () => {
        this._currentHost = null;
        this._hosts = null;
      },
    };

    Object.keys(operations).forEach((operationKey) => {
      const { LOG_TRACE, LOG_INFO } = this.logger;
      const operation = operations[operationKey];
      try {
        LOG_TRACE(`Dispose: ${operationKey}`);
        operation();
        LOG_TRACE(`Dispose: ${operationKey} succeeded`);
      } catch (ex) {
        LOG_INFO(`Dispose: ${operationKey} failed:`, ex, '...continuing');
      }
    });
    this._disposed = true;
  }

  emitSessionEvent(event) {
    // Don't log here; callee does it
    this._session.sendEvent(event);
  }

  /**
   * @param {Number} correlationTag The correlation tag
   * @param {function} reqTimeoutCb The timeout callback
   * @param {Number} reqTimeout The timeout in milliseconds
   * @param {Object} correlationKey The correlation key
   * @param {function} respRecvCallback The success callback
   * @private
   */
  enqueueOutstandingCorrelatedReq(correlationTag,
                                  reqTimeoutCb,
                                  reqTimeout,
                                  correlationKey,
                                  respRecvCallback) {
    if (Check.nothing(correlationTag)) {
      return;
    }

    const { LOG_INFO } = this.logger;
    LOG_INFO(`Enqueue outstanding ctrl request correlationTag=${correlationTag}`);
    let timer = null;
    if (reqTimeoutCb) {
      timer = setTimeout(reqTimeoutCb, reqTimeout || this._sessionProperties.readTimeoutInMsecs);
    }

    const outstandingReq = new CorrelatedRequest(correlationTag,
      timer,
      correlationKey,
      respRecvCallback);
    this._correlatedReqs[correlationTag] = outstandingReq;
  }

  /**
   * @param {String} errorEventText Description of the error
   * @param {ErrorSubcode} errorSubcode Subcode for the error
   * @param {String} [eventReason] The reason for the error
   * @returns {undefined}
   * @memberof SessionFSM
   * @private
   */
  errorInFsm(errorEventText, errorSubcode, eventReason = null) {
    const { LOG_INFO } = this.logger;
    const sEvent = new SessionFSMEvent({ name: SessionEventName.EXCEPTION });
    LOG_INFO(`Handling error in FSM: ${errorEventText} ${eventReason && eventReason.stack}`);
    this.setCurrentError({
      eventText: errorEventText,
      errorSubcode,
      eventReason,
    });
    return this.processEvent(sEvent);
  }

  flushTransportSession(callback) {
    if (this._transport) {
      this._transport.flush(callback);
    } else {
      callback();
    }
  }

  /**
   * @returns {Number} The next correlation tag
   * @private
   */
  getCorrelationTag() {
    return this._smfClient.nextCorrelationTag();
  }

  /**
   * Get current state name
   * @returns {String} The name of the current state, or SessionStateName.DISPOSED if the
   *  FSM is terminated
   * @private
   */
  getCurrentStateName() {
    const currentState = this.getCurrentState();

    if (!currentState) return null;
    if (currentState === this.getFinalState()) return SessionStateName.DISPOSED;
    return this.getCurrentState().getName();
  }

  /**
   * Returns the value of a given {@link StatType}.
   *
   * @param {StatType} statType The statistic to query.
   * @returns {?Number} The value for the given stat, if available
   * @private
   */
  getStat(statType) {
    if (this._sessionStatistics === undefined) {
      return undefined;
    }
    //
    // TX_TOTAL_DATA_MSGS and TX_TOTAL_DATA_BYTES are summary counters
    // These stats are calculated as the sum of DIRECT/PERSISTENT/NONPERSISTENT
    // counters.  This is by design since day one.  As such they do not include
    // the count of redelivered messages/bytes as TX_PERSISTENT_MSGS and TX_NONPERSISTENT_MSGS
    // only includes the counts of messages successfully delivered. This is not consistent
    // with receive stats but it is what it is. This definition is consistent with the other
    // existing APIs (CCSMP and JCSMP).
    //
    if (statType === StatType.TX_TOTAL_DATA_MSGS) {
      return this._sessionStatistics.getStat(StatType.TX_DIRECT_MSGS) +
        this._sessionStatistics.getStat(StatType.TX_PERSISTENT_MSGS) +
        this._sessionStatistics.getStat(StatType.TX_NONPERSISTENT_MSGS);
    } else if (statType === StatType.TX_TOTAL_DATA_BYTES) {
      return this._sessionStatistics.getStat(StatType.TX_DIRECT_BYTES) +
        this._sessionStatistics.getStat(StatType.TX_PERSISTENT_BYTES) +
        this._sessionStatistics.getStat(StatType.TX_NONPERSISTENT_BYTES);
    }
    return this._sessionStatistics.getStat(statType);
  }

  /**
   * Gets a transport session information string.
   * This string is informative only, and applications should not attempt to parse it.
   *
   * @returns {String} The current status of the transport
   */
  getTransportInfo() {
    if (Check.nothing(this._transport)) {
      return 'Not connected.';
    }
    return this._transport.getInfoStr();
  }

  /**
   * Handle an AD CTRL message given the message, its header, and the previously-identified
   * candidate flow, which is used if the message has no correlation tag.
   *
   * @param {AdProtocolMessage} message The message to handle
   * @param {SMFHeader} header The header of the message
   * @returns {SessionFSM} This FSM.
   * @private
   */
  handleADCtrlMessage(message, header) {
    const flowId = message.getFlowId();
    const respText = header.pm_respstr;
    const correlationTag = header.pm_corrtag;
    const { LOG_INFO, LOG_DEBUG, LOG_WARN } = this.logger;

    if (correlationTag) {
      // Correlation tag is non-null
      // Session handles correlated request-reply
      this.updateRxStats(message);

      LOG_INFO(`Handle SMF response for correlationTag ${correlationTag}`);
      // find matching correlationTag to cancel timer
      const cancelledRequest = this.cancelOutstandingCorrelatedReq(correlationTag);
      if (Check.nothing(cancelledRequest)) {
        return this.errorInFsm(`Cannot find matching request for response: ${respText}`,
                               ErrorSubcode.INTERNAL_ERROR);
      }

      if (cancelledRequest.respRecvdCallback) {
        // call callback referenced by cancelledRequest
        // login or update property
        cancelledRequest.respRecvdCallback(message, cancelledRequest);
        return this;
      }

      LOG_DEBUG(`Dropping ADCTRL message due to mismatched correlation tag ${correlationTag}`);
      this.incStat(StatType.RX_REPLY_MSG_DISCARD);
      return this;
    }

    // Unsolicited control message
    let flow;
    const msgType = message.msgType;
    const { SMFAdProtocolMessageType } = SMFLib;
    switch (msgType) {
      case SMFAdProtocolMessageType.CLIENTACK:
      case SMFAdProtocolMessageType.CLIENTNACK:
      case SMFAdProtocolMessageType.CLOSEPUBFLOW:
        if (this._defaultPublisher.flowId === flowId) {
          flow = this._defaultPublisher;
        } // else drop.
        break;
      default:
        flow = this._consumers.getFlowById(flowId);
    }

    if (flow && !flow.disposed) {
      // Found a matching flow
      this.updateRxStats(message, flow);
      flow.handleUncorrelatedControlMessage(message);
      return this;
    }

    // No matching flow
    const response = message.getResponse();
    const rc = response ? `"${response.responseCode} ${response.responseString}" ` : '';
    LOG_WARN(
      `Dropping ADCTRL.${SMFLib.SMFAdProtocolMessageType.describe(message.msgType)
      } ${rc}for unknown flow ${flowId}`
    );
    this.incStat(StatType.RX_DISCARD_NO_MATCHING_CONSUMER);

    return this;
  }

  /**
   * @param {solace.Message} message The AD data message to handle
   * @param {SMFHeader} header The header for the message
   * @returns {MessageConsumer} The flow that handled the message, or `null`
   * @private
   */
  handleADTrMessage(message, header) {
    const { LOG_DEBUG } = this.logger;
    const flowId = header.pm_ad_flowid;
    // Get a flow from that ID if possible.
    const flow = this._consumers.getFlowById(flowId);
    if (!flow || flow.disposed) {
      LOG_DEBUG('Dropped incoming AD message for ' +
                `${flow ? 'disposed' : 'unknown'} flow ID ${flowId}`);
      this.updateRxStats(message, this._sessionStatistics);
      this.incStat(StatType.RX_DISCARD_NO_MATCHING_CONSUMER);
      return null;
    }
    this.updateRxStats(message, flow);
    flow.handleDataMessage(message);
    return flow;
  }

  /**
   * @param {String} correlationTag The correlation tag for the timed out subscription request
   * @param {String} [timeoutMsg] The reason for the timeout
   * @private
   */
  handleApiSubscriptionTimeout(correlationTag, timeoutMsg) {
    // remove request from queue
    if (this._correlatedReqs[correlationTag] === undefined ||
        this._correlatedReqs[correlationTag] === null) {
      return;
    }

    const { LOG_INFO, LOG_ERROR } = this.logger;
    LOG_INFO(`${timeoutMsg || 'Subscription timeout'} for correlationTag=${correlationTag}`);
    try {
      const result = delete this._correlatedReqs[correlationTag];
      if (!result) {
        LOG_ERROR(`Cannot delete ctrl request ${correlationTag}`);
      }
    } catch (e) {
      LOG_ERROR(`Cannot delete ctrl request ${correlationTag}, exception: ${e.message}`);
    }
  }

  /**
   * @param {ClientCtrlMessage} message The message to handle
   * @param {SMFHeader} header The header from the message
   * @returns {undefined}
   * @private
   */
  handleClientCtrlMessage(message, header) {
    let correlationTag;
    const { LOG_INFO } = this.logger;
    this.updateRxStats(message);
    if (message.msgType === SMFLib.SMFClientCtrlMessageType.LOGIN) {
      // Currently, login requests don't use a correlation tag.
      // Using here a fake internal one to match the request.
      correlationTag = TransportLib.SMFClient.SMF_CLIENTCTRL_LOGIN_FAKE_CORRELATIONTAG;
      LOG_INFO('Handle SMF response for ClientCTRL Login');
    } else {
      correlationTag = header.pm_corrtag;
      LOG_INFO(`Handle SMF response for correlationTag ${correlationTag}`);
    }
    // find matching correlationTag to cancel timer
    const cancelledRequest = this.cancelOutstandingCorrelatedReq(correlationTag);
    if (Check.nothing(cancelledRequest)) {
      const respText = header.pm_respstr;
      return this.errorInFsm(`Cannot find matching request for response: ${respText}`,
                             ErrorSubcode.INTERNAL_ERROR);
    } else if (cancelledRequest.respRecvdCallback) {
      // call callback referenced by cancelledRequest
      // login or update property
      return cancelledRequest.respRecvdCallback(message);
    }

    LOG_INFO(`Dropping ClientCtrl message due to mismatched correlation tag ${correlationTag}`);
    return this.incStat(StatType.RX_REPLY_MSG_DISCARD);
  }

  /**
   * @param {ClientCtrlMessage} clientCtrlMsg The message to handle
   * @private
   */
  handleClientCtrlResponse(clientCtrlMsg) {
    const sEvent = new SessionFSMEvent({ name: SessionEventName.TRANSPORT_PROTOCOL_CLIENTCTRL });
    sEvent.smfMsg = clientCtrlMsg;
    this.processEvent(sEvent);
  }

  /**
   * @private
   */
  handleClientCtrlTimeout() {
    const { LOG_INFO } = this.logger;
    LOG_INFO('ClientCtrl timeout for session');
    const sEvent = new SessionFSMEvent({ name: SessionEventName.DOWNGRADE_TIMEOUT });
    this.processEvent(sEvent);
  }


  /**
   * @private
   */
  handleConnectTimeout() {
    const { LOG_INFO } = this.logger;
    LOG_INFO('Connection timeout. Disconnecting');
    const sEvent = new SessionFSMEvent({ name: SessionEventName.CONNECT_TIMEOUT });
    this.processEvent(sEvent);
  }

  /**
   * Handle control request timeout
   * @param {String} correlationTag The correlation tag for the timed out operation
   * @param {String} timeoutMsg The message associated with the timeout
   * @private
   */
  handleUpdatePropertyTimeout(correlationTag, timeoutMsg) {
    const { LOG_ERROR } = this.logger;
    // remove request from queue
    try {
      const result = delete this._correlatedReqs[correlationTag];
      if (!result) {
        LOG_ERROR(`Cannot delete ctrl request ${correlationTag}`);
      }
    } catch (e) {
      LOG_ERROR(`Cannot delete ctrl request ${correlationTag}, exception: ${e.message}`);
    }

    // notify client
    const sessionEvent = SessionEvent.build(SessionEventCode.PROPERTY_UPDATE_ERROR,
                                            timeoutMsg,
                                            null,
                                            ErrorSubcode.TIMEOUT,
                                            null,
                                            null);
    this.sendEvent(sessionEvent);
  }

  /**
   * @param {SMFHeader} header The header from the rejected message
   * @private
   */
  handleRejectedTrMessage(header) {
    // It is trmsg response. For direct message, it must be a failure response
    const respCode = header.pm_respcode;
    // Strip a trailing null character.
    const topicNameRaw = header.pm_tr_topicname_bytes;
    const topicName = topicNameRaw ? topicNameRaw.replace(/\0/g, '') : '';
    const respText = header.pm_respstr;
    const errorSubcode = ErrorResponseSubcodeMapper.getErrorSubcode(respCode, respText);
    this.emitSessionEvent(SessionEvent.build(
      SessionEventCode.REJECTED_MESSAGE_ERROR,
      respText,
      respCode,
      errorSubcode,
      null, //correlation key
      `Topic: ${topicName}`)
    );
  }

  /**
   * @param {solace.Message} message The SMF message to handle
   * @returns {undefined}
   * @private
   */
  handleSMFMessage(message) {
    try {
      // Stats for this message might affect both the session and the flow. Need to
      // determine where the message is handled before handling stats.
      // Some ADCTRL messages have no flow ID but only a correlation tag.
      // These are OPENFLOW and we count them as handled by the session.

      const header = message.smfHeader;

      if (header.discardMessage) {
        // UH==2 on an unknown parameter
        if (this._sessionStatistics) {
          this._sessionStatistics.incStat(StatType.RX_DISCARD_SMF_UNKNOWN_ELEMENT);
        }
        // do nothing.
        return null;
      }

      // Each message type handler must call this.updateRxStats(message, target).
      // The direct/TRmsg path includes the call here in the switch.
      switch (header.smf_protocol) {
        case SMFLib.SMFProtocol.TRMSG:
          if (header.smf_adf) {
            return this.handleADTrMessage(message, header);
          }
          this.updateRxStats(message, this._sessionStatistics);
          return header.pm_respcode === 0
            ? this._session.handleDataMessage(message)
            : this.handleRejectedTrMessage(header);

        case SMFLib.SMFProtocol.ADCTRL:
          return this.handleADCtrlMessage(message, header);

        case SMFLib.SMFProtocol.CLIENTCTRL:
          return this.handleClientCtrlMessage(message, header);

        case SMFLib.SMFProtocol.SMP:
          return this.handleSMPMessage(message, header);

        case SMFLib.SMFProtocol.KEEPALIVE:
        case SMFLib.SMFProtocol.KEEPALIVEV2:
          // do nothing
          return null;

        default:
          return this.handleUnknownProtocolMessage(message, header);
      }
    } catch (e) {
      const { LOG_ERROR } = this.logger;
      LOG_ERROR(`Exception in handleSMFMessage, exception: ${e.stack}`);
      return this.errorInFsm(`Exception in handleSMFMessage: ${e.message}`,
                             e.subcode || ErrorSubcode.INTERNAL_ERROR,
                             e);
    }
  }

  /**
   * @param {TransportError} transportError The SMF parsing error passed from the transport
   * @returns {undefined}
   * @private
   */
  handleSMFParseError(transportError) {
    // fatal connection error

    // notify client
    return this.errorInFsm(transportError,
                           ErrorSubcode.PROTOCOL_ERROR);
  }

  handleSMPMessage(message, header) {
    this.updateRxStats(message);

    // find matching correlationTag to cancel timer
    const cancelledRequest = this.cancelOutstandingCorrelatedReq(header.pm_corrtag || '');
    /*
     * If we find the correlationTag and it has a callback associated with it
     * then call that callback.  It is likely calling back straight to the application
     * due to a call to session.subscribe() or session.unsubscribe() or
     * session.updateProperty().
     * Otherwise just send a TRANSPORT_PROTOCOL_SMP event to the FSM.
     */
    if (Check.nothing(cancelledRequest) || Check.nothing(cancelledRequest.respRecvdCallback)) {
      /*
       * correlation tag not found, probably an error response to a subscription
       * request that did not request-confirm,  or it could be one of the
       * FSM generated subscriptions and the callback is the anonymous function
       * in state WaitingForSubConfirmm
       */
      const sEvent = new SessionFSMEvent({ name: SessionEventName.TRANSPORT_PROTOCOL_SMP });
      sEvent.smfMsg = message;
      return this.processEvent(sEvent);
    }

    // calling through the callback, this should  be a callback in the _session for
    // applicated generated subscribe/unsubscribe/updateProperty(clientName)
    return cancelledRequest.respRecvdCallback(message, cancelledRequest);
  }

  /**
   * @param {Number} respCode The router response code
   * @param {String} respText The router response text
   * @param {String} subscriptionStr The subscription cache key
   * @param {CorrelatedRequest} request The associated request
   * @param {Boolean} confirm Whether the user wanted confirmation for the request
   * @private
   */
  handleSubscriptionUpdateError(respCode, respText, subscriptionStr, request, confirm) {
    const errorSubcode = ErrorResponseSubcodeMapper.getErrorSubcode(respCode, respText);

    //
    // if it is a not-found or already-present error, it does not affect our subscription
    // cache, otherwise remove it from the cache.
    //
    if (!(errorSubcode === ErrorSubcode.SUBSCRIPTION_ALREADY_PRESENT ||
      errorSubcode === ErrorSubcode.SUBSCRIPTION_NOT_FOUND)) {
      // remove from cache
      this.removeFromSubscriptionCache(subscriptionStr);
    }

    //
    // notify the client
    //
    this._session.handleSubscriptionUpdateError(respCode,
                                                respText,
                                                subscriptionStr,
                                                request,
                                                confirm);
  }

  /**
   * @param {String} correlationTag The correlation tag for the timed out request
   * @private
   */
  handleSubscriptionTimeout(correlationTag) {
    // remove request from queue
    const { LOG_ERROR } = this.logger;
    try {
      let result = false;
      if(this._correlatedReqs) {
        result = delete this._correlatedReqs[correlationTag];
      }
      if (!result) {
        LOG_ERROR(`Cannot delete ctrl request ${correlationTag}`);
      }
    } catch (e) {
      LOG_ERROR(`Cannot delete ctrl request ${correlationTag}`, e);
    }
    const sEvent = new SessionFSMEvent({ name: SessionEventName.SUBSCRIBE_TIMEOUT });
    this.processEvent(sEvent);
  }

  /**
   * @param {TransportSessionEvent} transportEvent The event to handle
   * @returns {undefined}
   * @private
   */
  handleTransportEvent(transportEvent) {
    const { LOG_INFO, LOG_WARN } = this.logger;
    const infoStr = transportEvent.getInfoStr() || '';
    LOG_INFO(`Receive transport event: ${transportEvent}`);

    let sEvent;

    switch (transportEvent.getTransportEventCode()) {
      case TransportLib.TransportSessionEventCode.UP_NOTICE:
        sEvent = new SessionFSMEvent({ name: SessionEventName.TRANSPORT_UP });
        sEvent.sessionId = transportEvent.getSessionId();
        this.processEvent(sEvent);
        break;

      case TransportLib.TransportSessionEventCode.DESTROYED_NOTICE:
        sEvent = new SessionFSMEvent({ name: SessionEventName.TRANSPORT_DESTROYED });
        sEvent.sessionId = transportEvent.getSessionId();
        sEvent.eventText = infoStr;
        sEvent.errorSubcode = transportEvent.getSubcode();
        sEvent.eventReason = transportEvent;
        this._smfClient = null; // calling reset does nothing useful here
        this._transport = null; // transport has been destroyed
        this.processEvent(sEvent);
        break;

      case TransportLib.TransportSessionEventCode.CAN_ACCEPT_DATA:
        //
        // unblock the publisher too
        this.GuaranteedFlowControlledRelief();
        sEvent = new SessionFSMEvent({ name: SessionEventName.TRANSPORT_CAN_ACCEPT_DATA });
        sEvent.sessionId = transportEvent.getSessionId();
        this.processEvent(sEvent);
        break;

      case TransportLib.TransportSessionEventCode.SEND_ERROR:
        sEvent = new SessionFSMEvent({ name: SessionEventName.SEND_ERROR });
        sEvent.sessionId = transportEvent.getSessionId();
        sEvent.eventText = transportEvent.getInfoStr();
        sEvent.errorSubcode = transportEvent.getSubcode();
        sEvent.eventReason = transportEvent;
        this.processEvent(sEvent);
        break;

      case TransportLib.TransportSessionEventCode.DATA_DECODE_ERROR:
      case TransportLib.TransportSessionEventCode.PARSE_FAILURE:
        // fatal connection error
        return this.errorInFsm(transportEvent.getInfoStr(),
                               transportEvent.getSubcode());
      default:
        LOG_WARN('Received unknown transport session event', transportEvent);
    }

    return true;
  }

  /**
   * @param {BaseMessage|Message} message The message to handle
   * @param {SMFHeader} [header] The SMF header from the message, if available
   * @returns {undefined}
   * @private
   */
  handleUnknownProtocolMessage(message, header) {
    const { LOG_INFO, LOG_ERROR } = this.logger;
    // unknown protocol
    this.updateRxStats(message);
    if (header && header.smf_protocol === SMFLib.SMFProtocol.TSESSION) {
      // change state
      LOG_ERROR(`Received transport session message instead of SMF message, protocol 0x${
        formatHexString(header.smf_protocol)}`);
      LOG_ERROR(`Transport MessageType=${message.messageType}, target sessionId=${
        formatHexString(message.sessionId)}`);
      // notify client
      return this.errorInFsm('Received message with unknown protocol',
                             ErrorSubcode.PARSE_FAILURE);
    }

    // Drop message of unknown protocol and increment stats
    if (this._sessionStatistics) {
      this._sessionStatistics.incStat(StatType.RX_DISCARD_SMF_UNKNOWN_ELEMENT);
    }
    LOG_INFO(`Drop message with unknown protocol 0x${formatHexString(header.smf_protocol)}`);
    return null;
  }


  /**
   * Increments a session statistic
   *
   * @param {StatType} statType The key to increment
   * @param {?Number} value The amount to increment the value by
   * @returns {?Number} The new value for the statistic
   * @private
   */
  incStat(statType, value) {
    return this._sessionStatistics ? this._sessionStatistics.incStat(statType, value) : undefined;
  }

  /**
   * @private
   */
  initTransport() {
    const { LOG_INFO } = this.logger;
    const host = this._currentHost;
    LOG_INFO(`Creating transport session ${host}`);
    this._kaStats = { lastMsgWritten: 0, lastBytesWritten: 0 };

    this._smfClient = new TransportLib.SMFClient(
      rxData => this.handleSMFMessage(rxData),
      rxError => this.handleSMFParseError(rxError),
      this
    );

    this._transport = TransportLib.TransportFactory.createTransport(
      host,
      transportEvent => this.handleTransportEvent(transportEvent),
      this._smfClient,
      this._sessionProperties.clone(),
      () => this.sessionIdHex
    );
    this.injectTransportInterceptor(this._transportInterceptor);
  }

  /**
   * Installs a transport interceptor for the current and any future transports.
   *
   * The object methods .installed(instance) and .removed(instance) will be called if present, with
   * the transport instance as a parameter.
   *
   * To stop using transport interceptors, supply a null interceptor argument.
   *
   * @param {Object} interceptor An object given access to the internals of the transport instance.
   * @private
   */
  injectTransportInterceptor(interceptor) {
    this._transportInterceptor = interceptor;
    if (this._transport) {
      this._transport.setInterceptor(interceptor);
    }
  }

  /**
   * Call from keep alive scheduled task
   * @returns {undefined}
   * @private
   */
  keepAliveTimeout() {
    const { LOG_TRACE, LOG_DEBUG, LOG_INFO } = this.logger;
    LOG_TRACE('KeepAlive timeout');

    // session is in connected state but hasn't received keep alive response
    // Less than or equal to because this is the number of *already sent* KAs
    if (this._keepAliveCounter >= this._sessionProperties.keepAliveIntervalsLimit) {
      LOG_INFO(`Exceed maximum keep alive intervals limit ${
                this._sessionProperties.keepAliveIntervalsLimit}`);
      // stop timers
      LOG_DEBUG('Stop keep alive timer');
      if (this._keepAliveTimer) {
        clearInterval(this._keepAliveTimer);
      }

      // change session state
      return this.errorInFsm('Exceed maximum keep alive intervals limit',
                             ErrorSubcode.KEEP_ALIVE_FAILURE);
    }

    LOG_TRACE('About to send keep alive');

    const clientStats = this._transport.getClientStats();
    const prestatMsgWritten = clientStats.msgWritten;
    const prestatBytesWritten = clientStats.bytesWritten;

    const kaMsg = new SMFLib.KeepAliveMessage();
    const returnCode = this.send(kaMsg, null, true);
    if (returnCode !== TransportLib.TransportReturnCode.OK) {
      /*
       * TransportReturnCode.NO_SPACE is not possible.
       * Send is called with the forceAllowEnqueue parameter.
       * So whatever error is returned is fatal
       */
      return this.errorInFsm('Cannot send keep alive message',
                             ErrorSubcode.KEEP_ALIVE_FAILURE);
    }

    // We need to avoid incrementing the KA counter if we're in the process of
    // sending a huge message and we've had no opportunity to write a KA message.
    // Detection: last KA's snapshot of messages written is unchanged, but number of bytes
    // written has gone up.
    if (this._kaStats.lastMsgWritten === prestatMsgWritten &&
      this._kaStats.lastBytesWritten < prestatBytesWritten) {
      LOG_DEBUG('Keep alive sent',
                'Not incrementing keep alive counter due to large message send',
                `KA count = ${this._keepAliveCounter}`
      );
    } else {
      this._keepAliveCounter++;
      LOG_TRACE(`Last message written: ${this._kaStats.lastMsgWritten}`);
      LOG_TRACE(`Last bytes written: ${this._kaStats.lastBytesWritten}`);
      LOG_TRACE(`Keep alive sent, increment keep alive counter, keep alive count = ${
        this._keepAliveCounter}`);
    }
    this._kaStats.lastBytesWritten = clientStats.bytesWritten;
    this._kaStats.lastMsgWritten = clientStats.msgWritten;

    return true;
  }

  /**
   * Prepares a message for sending and then invoke the transport
   * send method. For guaranteed messages, pass the transport send
   * method to the publisher so it can be invoked from there and errors
   * handled within the publisher FSM.
   * @param {BaseMessage} message The message to prepare.
   * @private
   */
  prepareAndSendMessage(message) {
    if (message instanceof Message) {
      let returnCode;
      // Delegate message preparation where appropriate
      const deliveryMode = message.getDeliveryMode();
      switch (deliveryMode) {

        case MessageDeliveryModeType.DIRECT:
          if (!this._transport) return;
          message._payload_is_memoized = false;
          message._memoized_csumm = null;
          message._memoized_payload = null;
          returnCode = this.sendToTransport(message);
          break;

        case MessageDeliveryModeType.PERSISTENT:
        case MessageDeliveryModeType.NON_PERSISTENT:
          if (!this._defaultPublisher) {
            const reason = this._session.adLocallyDisabled
                ? 'locally disabled'
                : 'remotely unsupported';
            throw new OperationError('Session does not provide Guaranteed Message Publish capability',
                                       ErrorSubcode.GM_UNAVAILABLE,
                                       reason);
          } else {
            if (this._gmSendDisallowed) this._gmSendDisallowed(); // throws if present
            returnCode = this._defaultPublisher.prepareAdMessageAndSend(message);
          }
          break;

        default: {
          const { LOG_ERROR } = this.logger;
          LOG_ERROR('Unhandled message delivery mode', MessageDeliveryModeType.describe(deliveryMode));
        }
      }

      if (returnCode !== TransportLib.TransportReturnCode.OK) {
        if (returnCode === TransportLib.TransportReturnCode.NO_SPACE) {
          // Must be DIRECT message because the Publisher handles NO_SPACE returnCode
          // internally
          this._userBackpressured = true;
          throw new OperationError('Cannot send message - no space in transport',
            ErrorSubcode.INSUFFICIENT_SPACE,
            TransportLib.TransportReturnCode.describe(returnCode));
        }

        // This is a fatal session error
        this.setCurrentError(new OperationError('Cannot send message',
            ErrorSubcode.INVALID_OPERATION,
            TransportLib.TransportReturnCode.describe(returnCode)));
        this.processEvent(new SessionFSMEvent({ name: SessionEventName.EXCEPTION }));
      }
    }
  }

  GuaranteedFlowControlledRelief() {
    if (this._defaultPublisher) {
      this._defaultPublisher.processFSMEvent(
        new PublisherLib.PublisherFSMEvent({
          name: PublisherLib.PublisherFSMEventNames.CAN_SEND }));
    }
  }
  /**
   * Reapply subscriptions.
   * @returns {Boolean} True if all subscriptions were reapplied. False if WOULD_BLOCK.
   * @private
   */
  reapplySubscriptions() {
    const { LOG_INFO, LOG_DEBUG } = this.logger;
    const { SolclientFactory: { createTopicDestination } } = SolclientFactoryLib;
    LOG_INFO(`Reapplying subscriptions, count=${this._subscriptionCacheKeys.length}`);
    // add subscriptions and ask for confirm on last one

    if (!this._subscriptionCacheKeys) {
      // Nothing to do
      return true;
    }

    try {
      while (this._subscriptionCacheKeys.length) {
        //
        // The key is the topic string (topic.getName()).
        // sendSubscribe() requires a destination, so we encode it here.
        //
        // on entry to reapplySubscriptions we added the P2P topic to
        // subscriptionCacheKeys but there is no corresponding entry in
        // _subscriptionCache so we no longer use the key to index that actual
        // cache.
        // If it becomes necesary to pass the Topic object to sendSubscribe in a
        // a future enhancement (perhaps to remember flags like request-confirm) then
        // this logic here needs to be revisited.
        const key = this._subscriptionCacheKeys.shift();
        const requestConfirmation = this._subscriptionCacheKeys.length === 0;
        const topicDestination = createTopicDestination(key);
        const rc = this.sendSubscribe(topicDestination,
                                      requestConfirmation,
                                      null,
                                      this._sessionProperties.readTimeoutInMsecs,
                                      null);
        if (rc !== TransportLib.TransportReturnCode.OK) {
          this.errorInFsm(
            `Error occurred sending subscription: ${TransportLib.TransportReturnCode.describe(rc)}`,
            ErrorSubcode.INTERNAL_ERROR
          );
        }
      }
    } catch (e) {
      if (e instanceof OperationError && e.subcode === ErrorSubcode.INSUFFICIENT_SPACE) {
        LOG_DEBUG('Apply subscriptions blocked due to insufficient space, wait for can accept data event');
        return false;
      }
      this.errorInFsm(`Unexpected expection occurred while reapplying subscriptions: ${e}`,
                      e.subcode || ErrorSubcode.INTERNAL_ERROR,
                      e);
    }
    return true;    // sent all subscriptions
  }


  /**
   * @param {Destination} topic The topic to remove from the subscription cache
   * @returns {?Destination} The value in the subscription cache at that key
   * @private
   */
  removeFromSubscriptionCache(topic) {
    if (Check.nothing(topic) || !this._subscriptionCache) {
      return null;
    }

    const { LOG_DEBUG, LOG_ERROR } = this.logger;
    const key = (topic instanceof Destination) ? topic.name : topic;
    LOG_DEBUG(`Remove subscription ${key}`);
    const sub = this._subscriptionCache[key];
    if (sub === undefined || sub === null) {
      return null;
    }

    try {
      const result = delete this._subscriptionCache[key];
      if (!result) {
        LOG_ERROR(`Cannot remove subscription ${key}`);
      } else {
        this._subscriptionCacheCount--;
      }
    } catch (e) {
      LOG_ERROR(`Cannot remove subscription ${key}`, e);
    }
    return sub;
  }


  /**
   * Reset the FSM state and release all objects. This method is called once from
   * SessionFSM.onInitial
   * @private
   */
  reset() {
    this.resetStats();

    /**
     * The following fields are disposed when disconnect is called
     * and recreated when connect is called again.
     */
    this.sessionId = null;

    // Need to reschedule keepAliveTimer when some other write operation happens
    this._keepAliveTimer = null;
    this.resetKeepAliveCounter();
    this._correlatedReqs = {};

    this._disposed = false;

    this._smfClient = null;
    this._kaStats = { lastMsgWritten: 0, lastBytesWritten: 0 };

    /**
     * The following fields are destroyed when dispose is called
     * and cannot be reinitialized.
     */
    this._subscriptionCache = null;
    this._subscriptionCacheKeys = null;
    this._subscriptionCacheCount = 0;
    if (this._sessionProperties.reapplySubscriptions) {
      this._subscriptionCache = {};
    }

    // When negotiating the initial transport, we can fail and transparently reconnect.
    // this.resetTransportProtocolHandler();
    // this._lastKnownGoodTransport = null;

    // event and error information
    this._eventCode = null;
    this._responseCode = null;
    this.eventText = null;
    this.errorSubcode = null;
    this.eventReason = null;
  }

  /**
   * @private
   */
  resetKeepAliveCounter() {
    // Reset the KA counter. Called by the SMFClient on each SMF chunk received (whether full
    // message or not).
    this._keepAliveCounter = 0;
  }

  /**
   * @returns {?} The result of calling resetStats().
   * @private
   */
  resetStats() {
    return this._sessionStatistics ? this._sessionStatistics.resetStats() : undefined;
  }


  /**
   * Schedule keep alive task
   * @private
   */
  scheduleKeepAlive() {
    const { LOG_DEBUG, LOG_ERROR } = this.logger;
    const { keepAliveIntervalInMsecs } = this._sessionProperties;
    if (keepAliveIntervalInMsecs === 0) {
      // Keepalives disabled
      return;
    }

    if (this._keepAliveTimer) {
      clearInterval(this._keepAliveTimer);
    }

    this._keepAliveTimer = setInterval(() => {
      try {
        this.keepAliveTimeout();
      } catch (e) {
        LOG_ERROR('Error occurred in keepAliveTimeout', e);
      }
    }, keepAliveIntervalInMsecs);

    LOG_DEBUG(`Create Keepalive timer with interval: ${keepAliveIntervalInMsecs}ms`);
  }

  /**
   * Provide a method for the session object to use to send
   * messages to the transport.  This method is only used internally by
   * the FSM.  The session send() API should invoke sendToTransport() directly
   * so errors are thrown back to the application.
   *
   * @param {solace.Message} message The message to send
   * @param {Object} [statTarget=this._sessionStatistics] The sender of this message, for stats.
   * @param {Boolean} [forceAllowEnqueue=false] Set to true to force enqueueing of control messages
   *
   * @returns {TransportReturnCode} The RC from the transport
   * @private
   */
  send(message, statTarget = this._sessionStatistics, forceAllowEnqueue = false) {
    try {
      return this.sendToTransport(message, statTarget, forceAllowEnqueue);
    } catch (ex) {
      const { LOG_TRACE } = this.logger;
      // The send operation threw (or we threw locally), which is always a SessionException.
      LOG_TRACE(`Error sending message: ${ex.message}: ${ex.stack}`);
      this.errorInFsm(`Send operation failed: ${ex.message}`,
                      ex.subcode || ErrorSubcode.CONNECTION_ERROR);
    }
    return TransportLib.TransportReturnCode.CONNECTION_ERROR;
  }

  /**
   * Provide a method for the session object to use to send
   * messages to the transport.
   *
   * @param {solace.Message} message The message to send
   * @param {Object} [statTarget=this._sessionStatistics] The sender of this message, for stats.
   * @param {Boolean} [forceAllowEnqueue=false] Set to true to force enqueueing of control messages
   *
   * @returns {TransportReturnCode} The RC from the transport
   * @private
   */
  sendToTransport(message, statTarget = this._sessionStatistics, forceAllowEnqueue = false) {
    let returnCode = TransportLib.TransportReturnCode.CONNECTION_ERROR;

    if (!this._transport) {
      throw new OperationError('Transport has been destroyed', ErrorSubcode.INTERNAL_ERROR);
    }
    const content = SMFLib.Codec.Encode.encodeCompoundMessage(message);
    returnCode = this._transport.send(content, forceAllowEnqueue);
    switch (returnCode) {
      case TransportLib.TransportReturnCode.OK:
        this.updateTxStats(message, statTarget);
        break;
      case TransportLib.TransportReturnCode.NO_SPACE:
        if (!forceAllowEnqueue) {
          // This is allowed.
          break;
        }
      // Else fall through
      default:
        throw new OperationError(`Transport returned ${TransportLib.TransportReturnCode.describe(returnCode)}`,
          ErrorSubcode.INTERNAL_ERROR);
    }

    return returnCode;
  }


  /**
   * Initiates the ClientCtrl handshake, called from transportSessionEvent callback
   * @returns {TransportReturnCode} The RC from the transport
   * @private
   */
  sendClientCtrlLogin() {
    const { LOG_INFO, LOG_DEBUG, LOG_TRACE } = this.logger;

    this._compressedTLS =
      ((this._sessionProperties.compressionLevel > 0) &&
        (this._currentHost.match(/tcps:/i) !== null));
    this._plaintextTLS =
      ((this._currentHost.match(/tcps:/i) !== null) &&
        (this._sessionProperties.sslConnectionDowngradeTo === SslDowngrade.PLAINTEXT));
    LOG_TRACE(`sendClientLogin plaintextTLS: ${this._plaintextTLS} _compressedTLS: ${this._compressedTLS}`);

    // Don't use the correlation tag. For Login only, the router won't return it.
    const clientCtrlMsg = SMFLib.ClientCtrlMessage.getLogin(this._sessionProperties,
                                                            this._compressedTLS,
                                                            this._plaintextTLS);
    const returnCode = this.send(clientCtrlMsg);
    if (returnCode !== TransportLib.TransportReturnCode.OK) {
      this._responseCode = null;
      this.eventReason = null;
      // notify client
      if (returnCode === TransportLib.TransportReturnCode.NO_SPACE) {
        this.eventText = 'Cannot send client control - no space in transport';
        this.errorSubcode = ErrorSubcode.INSUFFICIENT_SPACE;
      } else {
        LOG_INFO(`Cannot send client ctrl, return code
          ${TransportLib.TransportReturnCode.describe(returnCode)}`);
        this.eventText = 'Cannot send client ctrl';
        this.errorSubcode = ErrorSubcode.INVALID_OPERATION;
      }
    } else {
      // enqueue outstanding request, use a fake correlation tag for Login only
      const correlationTag = TransportLib.SMFClient.SMF_CLIENTCTRL_LOGIN_FAKE_CORRELATIONTAG;
      LOG_INFO(`Using internally correlationTag=${correlationTag} for tracking ClientCTRL Login`);
      this.enqueueOutstandingCorrelatedReq(correlationTag,
                                           null,
                                           null,
                                           null,
                                           rxMsgObj => this.handleClientCtrlResponse(rxMsgObj));

      LOG_DEBUG('Sent client ctrl');
    }
    return returnCode;
  }

  /**
   *
   * Internal method for sending subscriptions from SessionConnecting state (reapply or P2P-inbox)
   *
   * @param {Destination} topic The topic to subscribe
   * @param {Boolean} requestConfirmation If true, expect a reply on success also
   * @param {String} correlationKey The correlation key for the request
   * @param {Number} requestTimeout The timeout for the request
   * @param {function} respRecvdCallback The callback on reply received
   * @returns {TransportReturnCode} The RC from the transport
   * @private
   */
  sendSubscribe(topic, requestConfirmation, correlationKey, requestTimeout, respRecvdCallback) {
    const { LOG_INFO, LOG_DEBUG } = this.logger;
    assert(topic instanceof Destination, 'sendSubscribe requires a Destination, not a string');
    LOG_DEBUG('Sending subscribe: ', topic, requestConfirmation, correlationKey);
    const correlationTag = this.getCorrelationTag();
    const smpMsg = SMFLib.SMPMessage.getSubscriptionMessage(correlationTag,
                                                            topic,
                                                            true, // set add == true
                                                            requestConfirmation);
    assert(smpMsg.encodedUtf8Subscription, 'Encoded SMP message was invalid');
    const returnCode = this.send(smpMsg);
    if (returnCode !== TransportLib.TransportReturnCode.OK) {
      LOG_INFO('Subscribe failed', TransportLib.TransportReturnCode.describe(returnCode));
      return returnCode;
    }
    if (requestConfirmation) {
      this.enqueueOutstandingCorrelatedReq(
        correlationTag,
        () => this.handleSubscriptionTimeout(correlationTag),
        requestTimeout || this._sessionProperties.readTimeoutInMsecs,
        correlationKey,
        respRecvdCallback);
    }
    return returnCode;
  }

  /**
   *
   * @param {MutableSessionProperty} mutableSessionProperty The property key to change
   * @param {?} newValue The new value for the property
   * @param {String} correlationKey The correlation key for the request
   * @param {Number} requestTimeout The timeout for the request
   * @param {function} respRecvdCallback The callback on response
   * @returns {TransportReturnCode} The RC from the transport
   * @private
   */
  sendUpdateProperty(mutableSessionProperty,
                     newValue,
                     correlationKey,
                     requestTimeout,
                     respRecvdCallback) {
    const correlationTag = this._smfClient.nextCorrelationTag();
    const smpMsg = SMFLib.ClientCtrlMessage.getUpdate(mutableSessionProperty,
                                                      newValue,
                                                      correlationTag);

    const returnCode = this.send(smpMsg);
    if (returnCode !== TransportLib.TransportReturnCode.OK) {
      return returnCode;
    }

    this.enqueueOutstandingCorrelatedReq(
      correlationTag,
      () => this.handleUpdatePropertyTimeout(correlationTag),
      requestTimeout || this._sessionProperties.readTimeoutInMsecs,
      correlationKey,
      respRecvdCallback);
    return returnCode;
  }

  /**
   * @private
   */
  setClientCtrlTimer() {
    this.clearClientCtrlTimer();

    this._clientCtrlTimer = setTimeout(() => this.handleClientCtrlTimeout(),
                                       this._sessionProperties.transportDowngradeTimeoutInMsecs);
  }

  /**
   * @private
   */
  setConnectTimer() {
    this.clearConnectTimer();
    // The hosts lists provides the wait time for inter-host timeouts.
    // This timeout is for the entire list.
    this._connectTimer = setTimeout(() => this.handleConnectTimeout(),
                                    this._sessionProperties.connectTimeoutInMsecs);
  }

  /**
   * Sets the error exit information for the FSM.
   *
   * This applies key-value pairs from properties to the current error object,
   * but does not allow overwriting and does not allow assignment of null or undefined values.
   *
   * Only the following fields are relevant, any other fields transferred to currentError by this
   * method are eventually ignored:
   *    * eventText
   *    * responseCode
   *    * errorSubcode
   *    * eventReason
   *
   * See SessionConnecting exitPoint 'errorExit'. This is the only place the information in
   * currentError is extracted.
   *
   * Could be implemented as `
   * this._currentError = Object.assign({}, filter(properties), this._currentError)
   * `
   * where `filter` is a key-value filter that works as described above.
   *
   * @param {Object} source An object with properties to be applied
   */
  setCurrentError(source) {
    const target = this._currentError || {};
    const { LOG_TRACE } = this.logger;
    Object.keys(source).forEach((key) => {
      if (source[key] === null || source[key] === undefined) return false;
      if (target[key] !== null && target[key] !== undefined) {
        LOG_TRACE(`Attempt to overwrite property {key=${key}, current value=${target[key]}, incoming value=${source[key]}}`);
        return false;
      }
      target[key] = source[key];
      return true;
    });
    this._currentError = target;
  }


  /**
   * Send a subscribe or unsubscribe request on behalf of the API.
   * @param {Destination} subject The target for the update
   * @param {Boolean} requestConfirmation Request a success message if true
   * @param {String} correlationKey The correlation key for the request
   * @param {Number} requestTimeout The timeout in milliseconds
   * @param {SessionRequestType} requestType The request type
   * @param {function} respRecvdCallback The callback on response
   * @returns {TransportReturnCode} The RC from the transport
   * @private
   */
  subscriptionUpdate(subject,
                     requestConfirmation,
                     correlationKey,
                     requestTimeout,
                     requestType,
                     respRecvdCallback) {
    //check topic supported
    const error = this.checkSessionDestinationCapability(subject);
    if (error) {
      throw error;
    }
    const timeoutMsg =
      SubscriptionUpdateTimeoutMessages[requestType] ||
      SubscriptionUpdateTimeoutMessages.default;
    const isSMP = (requestType !== SessionRequestType.REMOVE_DTE_SUBSCRIPTION);
    const add = (requestType === SessionRequestType.ADD_SUBSCRIPTION ||
      requestType === SessionRequestType.ADD_P2PINBOX);
    const correlationTag = this.getCorrelationTag();

    const generateMessage = isSMP
      ? SMFLib.SMPMessage.getSubscriptionMessage
      : SMFLib.AdProtocolMessage.getDTEUnsubscribeMessage;
    const msg = generateMessage(correlationTag,
                                subject,
                                add,
                                requestConfirmation);
    const returnCode = this.send(msg);
    if (returnCode !== TransportLib.TransportReturnCode.OK) {
      return returnCode;
    }

    if (requestConfirmation) {
      this.enqueueOutstandingCorrelatedReq(
        correlationTag,
        () => this.handleApiSubscriptionTimeout(correlationTag,
                                                timeoutMsg),
        requestTimeout || this._sessionProperties.readTimeoutInMsecs,
        correlationKey,
        respRecvdCallback);
    }
    if (requestType === SessionRequestType.ADD_SUBSCRIPTION &&
      this._sessionProperties.reapplySubscriptions) {
      this.addToSubscriptionCache(subject);
    } else if (requestType === SessionRequestType.REMOVE_SUBSCRIPTION &&
      this._sessionProperties.reapplySubscriptions) {
      this.removeFromSubscriptionCache(subject);
    }
    return returnCode;
  }

  /**
   * Send a queue subscribe or unsubscribe request.
   * @param {Destination} subject The target for the update
   * @param {Destination} queue The queue where the subscription is added/removed
   * @param {Number} requestTimeout The timeout in milliseconds
   * @param {Boolean} add (if true) or remove (if false).
   * @param {function} respRecvdCallback The callback on response
   * @returns {TransportReturnCode} The RC from the transport
   * @private
   */
  queueSubscriptionUpdate(subject,
                          queue,
                          requestTimeout,
                          add,
                          respRecvdCallback) {
    const timeoutMsg =
      add ? SubscriptionUpdateTimeoutMessages[SessionRequestType.ADD_SUBSCRIPTION] :
      SubscriptionUpdateTimeoutMessages[SessionRequestType.REMOVE_SUBSCRIPTION];
    const correlationTag = this.getCorrelationTag();

    const msg = SMFLib.SMPMessage.getQueueSubscriptionMessage(
      correlationTag,
      subject,
      queue,
      add);
    const returnCode = this.send(msg);
    if (returnCode !== TransportLib.TransportReturnCode.OK) {
      //TODO: act on this.
      // (throw.)
      return returnCode;
    }

    this.enqueueOutstandingCorrelatedReq(
      correlationTag,
      () => {
        const origReq = this._correlatedReqs[correlationTag];
        //TODO maybe just use the cancel method instead (which returns the original request)
        this.handleApiSubscriptionTimeout(correlationTag,
                                          timeoutMsg);
        respRecvdCallback(null, origReq);
      },
      requestTimeout || this._sessionProperties.readTimeoutInMsecs,
      null, // no need for session machinery to track flow correlationKey.
      respRecvdCallback);
    return returnCode;
  }

  /**
   * @param {BaseMessage|Message} smfMessage The message received
   * @param {Stats} [target] The statistics target to update, default is session stats
   * @private
   */
  updateRxStats(smfMessage, target = this._sessionStatistics) {
    if (!target) {
      return;
    }
    const smfHeader = smfMessage.smfHeader;
    if (!smfHeader) {
      return;
    }
    const deliveryMode = smfHeader.pm_deliverymode || 0;
    const msgStatKey = STAT_RX_BYMODE_MSGS[deliveryMode];
    const bytesStatKey = STAT_RX_BYMODE_BYTES[deliveryMode];
    const msgLength = smfHeader.messageLength;

    switch (smfHeader.smf_protocol) {
      case SMFLib.SMFProtocol.TRMSG:
        if (smfHeader.pm_respcode === 0) {
          target.incStat(StatType.RX_TOTAL_DATA_MSGS);
          target.incStat(msgStatKey);
          target.incStat(StatType.RX_TOTAL_DATA_BYTES, msgLength);
          target.incStat(bytesStatKey, msgLength);
          if (smfHeader.smf_di) {
            target.incStat(StatType.RX_DISCARD_MSG_INDICATION);
          }
        }
        break;
      case SMFLib.SMFProtocol.CLIENTCTRL:
      case SMFLib.SMFProtocol.SMP:
      case SMFLib.SMFProtocol.KEEPALIVE:
      case SMFLib.SMFProtocol.KEEPALIVEV2:
      case SMFLib.SMFProtocol.ADCTRL:
        target.incStat(StatType.RX_CONTROL_MSGS);
        target.incStat(StatType.RX_CONTROL_BYTES, msgLength);
        break;
      default:
    }
  }

  /**
  * @param {BaseMessage|Message} smfMessage The message sent
  * @param {Stats} [target] The statistics target to update, default is session stats
  * @private
  */
  updateTxStats(smfMessage, target = this._sessionStatistics) {
    if (!target) {
      return;
    }
    if (smfMessage.getReplyTo !== undefined && smfMessage.getReplyTo()) {
      // update stats
      target.incStat(StatType.TX_REQUEST_SENT);
    }
    const smfHeader = smfMessage.smfHeader;
    if (!smfHeader) {
      return;
    }

    const deliveryMode = smfHeader.pm_deliverymode || 0;
    let msgStatKey = STAT_TX_BYMODE_MSGS[deliveryMode];
    let bytesStatKey = STAT_TX_BYMODE_BYTES[deliveryMode];
    //
    // If this is a Guaranteed Message we may need to further refine the stats by the redelivered
    // status
    if (deliveryMode !== MessageDeliveryModeType.DIRECT) {
      if (smfMessage.isRedelivered()) {
        msgStatKey = STAT_TX_BYMODE_REDELIVERED[deliveryMode];
        bytesStatKey = STAT_TX_BYMODE_BYTES_REDELIVERED[deliveryMode];
      }
    }
    const msgLength = smfHeader.messageLength;

    switch (smfHeader.smf_protocol) {
      case SMFLib.SMFProtocol.TRMSG:
        target.incStat(msgStatKey);
        target.incStat(bytesStatKey, msgLength);
        break;
      case SMFLib.SMFProtocol.CLIENTCTRL:
      case SMFLib.SMFProtocol.SMP:
      case SMFLib.SMFProtocol.KEEPALIVE:
      case SMFLib.SMFProtocol.KEEPALIVEV2:
      case SMFLib.SMFProtocol.ADCTRL:
        target.incStat(StatType.TX_CONTROL_MSGS);
        target.incStat(StatType.TX_CONTROL_BYTES, msgLength);
        break;
      default:
    }
  }

  /**
   * @param {ClientCtrlMessage} clientCtrlRespMsg The client control message with props
   * @private
   */
  updateReadonlySessionProps(clientCtrlRespMsg) {
    const props = this._sessionProperties; // Modify session properties in place

    props._setVpnNameInUse(clientCtrlRespMsg.getVpnNameInUseValue() || '');
    const oldVirtualRouterName = props.virtualRouterName;
    const newVirtualRouterName = clientCtrlRespMsg.getVridInUseValue() || '';
    props._setVirtualRouterName(newVirtualRouterName);
    if (oldVirtualRouterName !== '' && oldVirtualRouterName !== newVirtualRouterName) {
      this.handleVirtualRouterNameChange(oldVirtualRouterName, newVirtualRouterName);
    }

    // The Solace Message Router login response should always contain a P2P topic for this client
    // name. If it doesn't that's an error (and we store "").
    props._setP2pInboxBase(clientCtrlRespMsg.getP2PTopicValue() || '');
    props._setP2pInboxInUse(P2PUtil.getP2PInboxTopic(props.p2pInboxBase));
    this._session.updateCapabilities(clientCtrlRespMsg.getRouterCapabilities());

    // Create and cache a guard for GM sending.
    const gmCap = this._session._getCapability(CapabilityType.GUARANTEED_MESSAGE_PUBLISH);
    this._gmSendDisallowed = (typeof gmCap === 'boolean' && !gmCap)
      ? () => {
        throw new OperationError(
        'Sending guaranteed message is not allowed by router for this client',
        ErrorSubcode.INVALID_OPERATION,
        null);
      }
      : null;
  }

  handleVirtualRouterNameChange(oldName, newName) {
    if (this._consumers) {
      this._consumers.flows.forEach(consumer => consumer.onVRNChanged());
      this._consumers.reconnectingFlows.forEach(consumer => consumer.onVRNChanged());
    }
    this.emitSessionEvent(
      SessionEvent.build(SessionEventCode.VIRTUALROUTER_NAME_CHANGED,
                         `Virtual router name is changed from ${oldName} to ${newName}`,
                         null,
                         0,
                         null,
                         null));
  }

  /**
   * @returns {String} The session's ID in hexadecimal format
   * @private
   */
  get sessionIdHex() {
    return this.sessionId && formatHexString(this.sessionId) || 'N/A';
  }

  updateTransportCompression(callback) {
    const { LOG_TRACE } = this.logger;
    LOG_TRACE('updateTransportCompression called. ' +
      `plaintextTLS: ${this._plaintextTLS} _compressedTLS: ${this._compressedTLS}`);
    if (this._plaintextTLS) {
      TransportLib.TransportFactory.severTls(this._transport, this._compressedTLS, callback);
      return null;
    }
    if (this._compressedTLS) {
      const newTransport = TransportLib.TransportFactory.startCompression(this._transport);
      return newTransport;
    }
    // nothing to do.
    return this._transport;
  }
}

module.exports.SessionFSM = SessionFSM;
