const SMFLib = require('solclient-smf');
const { ApplicationAck,
        ApplicationAckRingBuffer,
        ApplicationAckState } = require('./application-acks');
const { assert } = require('solclient-eskit');
const { CapabilityType } = require('solclient-session');
const { ConsumerFSMEvent } = require('./consumer-fsm-event');
const { ConsumerFSMEventNames } = require('./consumer-fsm-event-names');
const { ConsumerStateNames } = require('./consumer-state-names');
const { DestinationFromNetwork,
        DestinationType,
        Queue,
        Topic } = require('solclient-destination');
const { ErrorResponseSubcodeMapper,
        ErrorSubcode,
        OperationError } = require('solclient-error');
const { LogFormatter } = require('solclient-log');
const { Long } = require('solclient-convert');
const { MessageConsumerAcknowledgeMode } = require('./message-consumer-acknowledge-modes');
const { MessageConsumerEventName } = require('./message-consumer-event-names');
const { MessageDispatcher } = require('./message-dispatcher');
const { PrivateFlowEventName } = require('solclient-flow');
const { QueueAccessType,
        QueueDescriptor,
        QueuePermissions,
        QueueProperties,
        QueueType } = require('solclient-queue');
const { MessageOutcome, RgmidFactory } = require('solclient-message');
const { State, StateMachine } = require('solclient-fsm');
const { Stats,
        StatType } = require('solclient-stats');
const { Timer } = require('solclient-events');
const { TransportAcks,
        TransportAckResult } = require('./transport-acks');

const RING_BUFFER_SIZE = 512;
// assumption is that this first message (messageId = 0) settlement outcome is ACCEPTED (ACKed)
const ZERO_APP_ACK = new ApplicationAck(Long.UZERO, ApplicationAckState.ACKED_SENT, MessageOutcome.ACCEPTED);


class ConsumerFSM extends StateMachine {
  constructor({ name, consumer, sessionInterface, properties } = {}) {
    super({ name });

    const fsm = this;
    const formatter = function formatter(...args) {
      return [
        `[session=${sessionInterface.sessionIdHex}]`,
        `[message-consumer-fsm=${consumer.flowIdDec}]`,
        ...args,
      ];
    };
    const logger = this.logger = new LogFormatter(formatter);
    const {
      LOG_TRACE,
      LOG_DEBUG,
      LOG_INFO,
      LOG_WARN,
      LOG_ERROR,
    } = logger;
    this.log = logger.wrap(this.log, this);

    const autoAck = properties.acknowledgeMode === MessageConsumerAcknowledgeMode.AUTO;

    this._consumer = consumer;
    this._sessionInterface = sessionInterface;

    // Save copies of properties accessed outside the constructor
    this._acknowledgeTimeoutInMsecs = properties.acknowledgeTimeoutInMsecs;
    this._acknowledgeThreshold = properties.acknowledgeThreshold;
    this._localPreferredWindowSize = properties.windowSize;
    this._localMaxWindowSize = properties.windowSize;
    this._hasAutoAckSupport = autoAck;

    this._messageDispatch = new MessageDispatcher({ emitter: consumer, autoAck, logger });

    this._stats = new Stats();

    this._resetRemoteConnectionState();
    this._resetLocalConnectionState();

    // Flag to delay flow dispose() for autoAck.
    this._midDispatch = false;
    // Save a copy to clear out on first successful bind
    this._replayStartLocation = properties.replayStartLocation;

    this._errorCausingReconnect = null;
    //**********************************************************************************************
    // A bit of infrastructure around post-event actions, which includes maintaining a queue of
    // pending actions, and some logic to pre-process the actions before performing them.

    // These should not be the same as any MessageConsumerEventName value or they will collide in
    // the actionLocations object during event pre-processing.
    const emitEventActionType = 'EMIT';
    const startDispatchActionType = 'DISPATCH';
    const stopDispatchActionType = 'NO_DISPATCH';

    let postEventActions = [];
    function preprocessPostEventActions(startIndex) {
      // first pre-process post event actions.  This isn't an exhaustive search of every possible
      // combination of event sequences, as many are practically impossible, and therefore
      // impossible to test.  We only handle sequences that are known to be possible.  If there
      // are other sequences that need to be considered, we can add processing for them when we
      // know they are possible.
      LOG_TRACE(`Pre-processing post event action list @${startIndex}`, postEventActions);
      const actionLocations = {};
      postEventActions.forEach((action, i, arr) => {
        function cancelAction(firstActionName, cancellingActionName) {
          if (actionLocations[firstActionName] === undefined) {
            return;
          }
          const firstLocation = actionLocations[firstActionName];
          LOG_TRACE(`Cancelling post event action ${firstActionName}@${firstLocation} with ${
            cancellingActionName}@${i}`);
          actionLocations[firstActionName] = undefined;
          arr[firstLocation] = null;
          arr[i] = null;
        }
        if ((i < startIndex) || (!action)) {
          LOG_TRACE(`Skip n/a action@${i}, start: ${startIndex}, action:`, action);
          return;
        }
        switch (action.type) {
          case emitEventActionType:
            switch (action.data) {
              case MessageConsumerEventName.UP:
              case MessageConsumerEventName.ACTIVE:
              case MessageConsumerEventName.RECONNECTED:
                actionLocations[action.data] = i;
                break;

              case MessageConsumerEventName.DOWN:
              case MessageConsumerEventName.DOWN_ERROR:
              //case MessageConsumerEventName.RECONNECTED: // I can't get this to work :-/
                cancelAction(MessageConsumerEventName.UP, action.data);
                break;

              case MessageConsumerEventName.INACTIVE:
                cancelAction(MessageConsumerEventName.ACTIVE, action.data);
                break;

              default:
                LOG_ERROR(`Unexpected event in post-event action: ${action.data}`);
                break;
            }
            break;

          case startDispatchActionType:
            actionLocations[action.type] = i;
            break;

          case stopDispatchActionType:
            cancelAction(startDispatchActionType, action.type);
            break;

          default:
            break;
        }
      });
    }

    function processPostEventActions() {
      let preprocessedActions = 0;
      let i;

      // Now we've pre-processed the list, execute any remaining non-null entries.  We don't use a
      // foreach loop since foreach won't include elements added after processing has started.  This
      // can occur if a callback causes more postEventActions to be added.
      LOG_TRACE('Executing post event action list', postEventActions);
      for (i = 0; i < postEventActions.length; ++i) {
        if (preprocessedActions < postEventActions.length) {
          preprocessPostEventActions(i);
          preprocessedActions = postEventActions.length;
        }
        const action = postEventActions[i];
        LOG_TRACE(`Action ${i}:`, action);
        if (!action) {
          continue;
        }
        switch (action.type) {
          case emitEventActionType:
            if (action.error !== undefined) {
              consumer._emit(action.data, action.error);
            } else {
              consumer._emit(action.data);
            }
            break;

          case stopDispatchActionType:
            // We don't actually do the stop here -- it is always done when this is enqueued.
            // The reason for enqueuing this event is to that it can cencel a startDispatch, if
            // present.
            break;

          case startDispatchActionType:
            this.requestStartDispatchFSM();
            break;

          default:
            LOG_ERROR(`Unhandled post event action type: ${action.type}`);
            break;
        }
      }
      postEventActions = [];
    }

    function addPostEventAction(type, data, error) {
      let newEvent;
      if (data !== undefined) {
        newEvent = { type, data, error };
      } else {
        newEvent = { type };
      }
      postEventActions.push(newEvent);
      LOG_TRACE(`Added post event action ${postEventActions.length}:`, newEvent);
      if (postEventActions.length === 1) {
        fsm.setPostEventAction(processPostEventActions);
      }
    }

    const addEventToEmit = (event, error) => {
      if (!properties.activeIndicationEnabled) {
        if ((event === MessageConsumerEventName.INACTIVE) ||
            (event === MessageConsumerEventName.ACTIVE)) {
          LOG_TRACE(`Skip emitting ${event} due to disabled flow active indications`);
          return;
        }
      }
      addPostEventAction(emitEventActionType, event, error);
    };
    const requestStartDispatch = () => {
      addPostEventAction(startDispatchActionType);
    };
    const requestStopDispatch = () => {
      // We do the stop inline, then add a post-event action so that it can have the effect of
      // cancelling a previous start.  It will not execute the stop when processed however since we
      // have already done it here.
      this.requestStopDispatchFSM();
      addPostEventAction(stopDispatchActionType);
    };

    this._addEventToEmit = addEventToEmit;
    this._requestStartDispatch = requestStartDispatch;
    this._requestStopDispatch = requestStopDispatch;

    // End of post-event action infrastructure

    // Utility functions
    function emitBindFailed(error) {
      assert(error instanceof OperationError);
      consumer._emit(MessageConsumerEventName.CONNECT_FAILED_ERROR, error);
    }

    function emitRebindFailed(error) {
      assert(error instanceof OperationError);
      consumer._emit(MessageConsumerEventName.DOWN_ERROR, error);
    }

    function verifyEndpointSubscription(destination) {
      let error = null;
      if (destination) {
        const subInfo = destination.getSubscriptionInfo();
        if (subInfo && (subInfo.isShare || subInfo.isNoExport) &&
            !fsm._sessionInterface.isCapable(CapabilityType.SHARED_SUBSCRIPTIONS)) {
          error = new OperationError('Shared Subscriptions not Supported',
                                     ErrorSubcode.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED);
        }
      }
      return error;
    }

    function verifyNackSettlementOutcomesSupport(requiredSettlementOutcomes) {
      let error = null;
      // check for whether the broker session supports the NACK capability
      const settlementOutcomes = requiredSettlementOutcomes;
      if (settlementOutcomes && settlementOutcomes.length > 0) {
        // check the capability bit for settlement outcome (NACK) support or throw OperationError()
        const isNackRequired = settlementOutcomes.some(v => v === MessageOutcome.FAILED || MessageOutcome.REJECTED);
        if(isNackRequired && !fsm._sessionInterface.isCapable(CapabilityType.AD_APP_ACK_FAILED)) {
          const outcomeNotSupportedError = `Session.capabilitySettlementOutcomeNotSupported: [ ${
            settlementOutcomes.map(v => MessageOutcome.nameOf(v)).join(', ')
          } ]`;
          // log the operation error as a Warning
          LOG_WARN(outcomeNotSupportedError);
          // then create the actual OperationError object here
          error = new OperationError(outcomeNotSupportedError, ErrorSubcode.INVALID_OPERATION);
        }      
      }
      return error;
    }
    //**********************************************************************************************
    // ChoicePoints on transitions.
    // Not every transition is unilateral. In some cases, particularly with session-up/flow-open
    // we want to check the session capabilities, which will effect which transition is taken.
    // The following choicePoints are NOT generic, they may only be used in the documented states
    // to control the transition out of that state.
    // ChoicePoints must return a new state (this.transitionTo())
    //
    // checkCapabiltiesChoicePoint - Check the session capabilities against the consumer properties.
    // If all requested properties are supported by the session capabilties, transitionTo(BindSent).
    // otherwise emitError and transitionTo unbound.
    // Used in:  fsm.Unbound.AwaitFlowOpen
    //           fsm.Unbound.AwaitSessionUp
    //
    function checkCapabilitiesChoicePoint(curState) {
      let error = null;
      if (fsm._replayStartLocation !== undefined &&
        !fsm._sessionInterface.isCapable(CapabilityType.MESSAGE_REPLAY)) {
        error = new OperationError('Message Replay Not Supported',
              ErrorSubcode.REPLAY_NOT_SUPPORTED);
      } else if (properties.topicEndpointSubscription) {
        error = verifyEndpointSubscription(properties.topicEndpointSubscription);
      }
      else if (properties.requiredSettlementOutcomes) {
        error = verifyNackSettlementOutcomesSupport(properties.requiredSettlementOutcomes);
      }

      if (error) {
        // Transition to Unbound means we wait for both SessionUp and flow open, which is rare.
        // It makes sense here though, because the next session might be capable,
        // but this one surely isn't.
        // (and FLOW_DOWN is emitted, so we are waiting for both.)
        return curState.transitionTo(fsm.Unbound, () =>
              emitBindFailed(error));
      }
      // OK to bind
      if (properties.createIfMissing &&
        properties.queueDescriptor &&
        properties.queueDescriptor.durable) {
        return curState.transitionTo(fsm.CreateSent);
      }
      return curState.transitionTo(fsm.BindSent);
    }

    /**
     * Check Capabilities on Re-Bind
     * 
     * @param {*} curState - current state
     * @param {*} externalTransition - is an external transition?
     * @returns {State} retrurns the new state for the state machine
     * @private
     */
    function checkCapabilitiesChoicePointOnRebind(curState, externalTransition = false) {
      let error = null;
      if (properties.requiredSettlementOutcomes) {
        error = verifyNackSettlementOutcomesSupport(properties.requiredSettlementOutcomes);
      }

      if (error) {
        // Transition to Unbound means we wait for both SessionUp and flow open, which is rare.
        // It makes sense here though, because the next session might be capable,
        // but this one surely isn't.
        // (and FLOW_DOWN is emitted, so we are waiting for both.)
        return curState.transitionTo(fsm.Unbound, () =>
              emitBindFailed(error));
      }
      // OK to re-bind
      if (externalTransition) {
        return curState.externalTransitionTo(fsm.Reconnecting.RBindSent);
      }
      return curState.transitionTo(fsm.Reconnecting.RBindSent);
    }
    // End of ChoicePoints
    //*********************************************************************************************
    // FSM config

    function handleAccidentalBind(msg) {
      // Unbind with timout-recursion using flowid.

      LOG_TRACE(`handleAccidentalBind mssg: ${msg}`);
      const correlationTag = sessionInterface.getCorrelationTag();
      const message = SMFLib.AdProtocolMessage.getCloseMessageConsumer(msg.flowId, correlationTag);
      sessionInterface.sendControl(message);
      sessionInterface.enqueueRequest(
        correlationTag,
        () => this.handleAccidentalBind(msg),
        properties.connectTimeoutInMsecs,
        null,
        null); //ignore response. Maybe log?
    }

    this.unhandledEventReaction(function onUnhandledEvent(event) {
      switch (event.getName()) {
        case ConsumerFSMEventNames.VIRTUALROUTER_NAME_CHANGED:
          LOG_INFO('VirtualRouter name change: clearing all acknowledgement state and partition group ID, if any');
          fsm._resetRemoteConnectionState();
          return this;
        case ConsumerFSMEventNames.FLOW_UNBOUND:
          LOG_INFO('Received unsolicited unbind. Flow may be manually reconnected.');
          return this.transitionToUnbound(MessageConsumerEventName.DOWN_ERROR, event.details);
        case ConsumerFSMEventNames.DISPOSE:
          fsm._dispose();
          return fsm.getCurrentState().terminate();
        case ConsumerFSMEventNames.BIND_RESPONSE:
          LOG_TRACE('Unhandled bind, assuming accidental BIND duplication');
          handleAccidentalBind(event.details);
          return this;
        default:
          LOG_TRACE(`Ignoring event ${event.getName()} in state ${this.getCurrentState().getName()}`);
          return this;
      }
    });

    this.initial(function onInitial() {
      return this.transitionTo(fsm.Unbound, (context) => {
        LOG_INFO(`Starting ${context.getStateMachine().getName()}`);
      });
    });

    /*
      We don't transition directly from Unbound to BindSent.
      We need the following events:
       * FLOW_OPEN
       * SESSION_UP
      (The session ensures we get SESSION_UP or SESSION_UP_NO_AD if we are created
      and there is a session already up.)
      The Unbound base state implements transitions to its wait state children:
       * FLOW_OPEN -> AwaitSessionUp
       * SESSION_UP -> AwaitFlowOpen
      It also has the following transitions that pull the FSM out of its children
      and back to itself:
       * FLOW_CLOSE
       * SESSION_DOWN
      The child states can override these with internal transitions.
     */
    this.Unbound = new State({
      name:          ConsumerStateNames.UNBOUND,
      parentContext: fsm,
    }, {
      emitDisabledEvent() {
        consumer._emit(MessageConsumerEventName.GM_DISABLED);
      },
    })
      .reaction(ConsumerFSMEventNames.FLOW_CLOSE, function onFlowClose() {
        return this.transitionTo(this);
      })
      .reaction(ConsumerFSMEventNames.SESSION_DOWN, function onSessionDown() {
        return this.transitionTo(this);
      })
      .reaction(ConsumerFSMEventNames.SESSION_DISCONNECT, function onSesssionDisconnect() {
        return this.transitionTo(this);
      })
      .reaction(ConsumerFSMEventNames.FLOW_OPEN, function onFlowOpen() {
        return this.transitionTo(fsm.Unbound.AwaitSessionUp); // Enter child state
      })
      .reaction(ConsumerFSMEventNames.SESSION_UP, function onSessionUp() {
        if (fsm._sessionInterface.isCapable(CapabilityType.GUARANTEED_MESSAGE_CONSUME)) {
          return this.transitionTo(fsm.Unbound.AwaitFlowOpen); // Enter child state
        }
        LOG_WARN(`Consumer is not supported by router for this client on sessionId 0x${
                 fsm._sessionInterface.sessionIdHex}`);
        // Act as if the child state handled this
        return this.internalTransition(() => this.emitDisabledEvent());
      })
      .reaction(ConsumerFSMEventNames.SESSION_UP_NO_AD, function onSessionUpNoAD() {
        // Act as if the child state handled this
        return this.internalTransition(() => this.emitDisabledEvent());
      })
      .exit(() => {
        fsm._connectAttempts = properties.connectAttempts;
      });

    /*
      In this state, we have received FLOW_OPEN, but no SESSION_UP.
      SESSION_DOWN does not cancel this wait, so we must override the
      parent and explicitly do nothing.
    */
    this.Unbound.AwaitSessionUp = new State({
      name:          ConsumerStateNames.UNBOUND_AWAIT_SESSION_UP,
      parentContext: this.Unbound,
    }, {
      emitBindWaiting() {
        consumer._emit(PrivateFlowEventName.BIND_WAITING);
      },
    })
      .entry(function onEntry() {
        this.emitBindWaiting();
      })
      .reaction(ConsumerFSMEventNames.SESSION_DOWN, function onSessionDown() {
        return this.internalTransition(); // Don't exit; continue waiting
      })
      .reaction(ConsumerFSMEventNames.SESSION_DISCONNECT, function onSessionDown() {
        return this.internalTransition(); // Don't exit; continue waiting
      })
      .reaction(ConsumerFSMEventNames.SESSION_UP, function onSessionUp() {
        return checkCapabilitiesChoicePoint(this);
      });

    /*
      In this state, we have received SESSION_UP but not FLOW_OPEN.
      If we get FLOW_OPEN, attempt to bind. Any other event is handled
      by the parent.
     */
    this.Unbound.AwaitFlowOpen = new State({
      name:          ConsumerStateNames.UNBOUND_AWAIT_FLOWOPEN,
      parentContext: this.Unbound,
    })
      .reaction(ConsumerFSMEventNames.FLOW_OPEN, function onFlowOpen() {
        return checkCapabilitiesChoicePoint(this);
      });
    // Factored out the extension object from the State constructor for reuse in
    // flow auto-reconnect sister-states.
    this.BindSentExtensions = {
      sendBindRequest() {
        const correlationTag = sessionInterface.getCorrelationTag();
        const transportAcks = fsm._transportAcks;

        fsm._endpointEnsure();
        const endpoint = fsm._endpoint;
        const subscription = fsm._subscription;

        const message = SMFLib.AdProtocolMessage.getOpenMessageConsumer(
          properties.queueDescriptor,
          properties.queueProperties,
          endpoint,
          subscription,
          correlationTag,
          properties.windowSize,
          properties.noLocal,
          properties.activeIndicationEnabled,
          transportAcks.lastAcked,
          transportAcks.lastReceived,
          properties.browser,
          fsm._replayStartLocation,
          consumer.endpointErrorId,
          consumer.partitionGroupId,
          (properties.requiredSettlementOutcomes && properties.requiredSettlementOutcomes.length > 0));
        sessionInterface.sendControl(message);
        sessionInterface.enqueueRequest(correlationTag,
                                        this.handleBindTimeout.bind(this),
                                        properties.connectTimeoutInMsecs,
                                        null,
                                        this.handleBindResponse.bind(this));
        LOG_TRACE('Sent consumer bind request with arguments', [
          properties.queueDescriptor,
          properties.queueProperties,
          endpoint,
          subscription,
          correlationTag,
          properties.windowSize,
          properties.noLocal,
          properties.activeIndicationEnabled,
          transportAcks.lastAcked,
          transportAcks.lastReceived,
          properties.browser,
          fsm._replayStartLocation,
          consumer.endpointErrorId,
          consumer.partitionGroupId,
          properties.requiredSettlementOutcomes]);
      },
      cancelBindRequestTimer() {
        LOG_TRACE('Cancelling bindRequestTimer.');
        this.bindRequestTimer.cancel();
      },
      handleBindTimeout() {
        LOG_INFO('Bind timeout');
        fsm.processEvent(new ConsumerFSMEvent({ name: ConsumerFSMEventNames.BIND_TIMEOUT }));
      },
      handleExpectedBind(msg) {
        const accessTypeOrDefault = x => (x === undefined ? QueueAccessType.EXCLUSIVE : x);

        // the partition group ID
        let partitionGroupIdValue = msg.getPartitionGroupId();
        if(partitionGroupIdValue == undefined || partitionGroupIdValue == null) {
          partitionGroupIdValue = undefined; // set it as undefined
          fsm._clearPartitionGroupId(); // clear the partitionGroupId
        }

        const attrs = {
          lastMsgIdAcked:       msg.getLastMsgIdAcked(),
          flowId:               msg.getFlowId(),
          accessType:           accessTypeOrDefault(msg.getAccessType()),
          topicEndpointBytes:   msg.getTopicEndpointBytes(),
          grantedPermissions:   msg.getGrantedPermissions(),
          allOthersPermissions: msg.getAllOthersPermissions(),
          respectsTTL:          msg.getRespectsTTL(),
          activeFlow:           msg.getActiveFlow(),
          wantFlowChangeNotify: msg.getWantFlowChangeNotify(),
          discardBehavior:      msg.getQueueDiscardBehavior(),
          deliveryCountSent:    msg.getEndpointDeliveryCountSent(),
          endpointId:           msg.getEndpointId(),
          maxUnackedMessages:   msg.getMaxUnackedMessages(),
          endpointErrorId:      msg.getEndpointErrorId(),
          spoolerUniqueId:      msg.getSpoolerUniqueId(),
          quota:                msg.getQuota(),
          maxMsgSize:           msg.getMaxMsgSize(),
          maxRedelivery:        msg.getMaxRedelivery(),
          partitionGroupId:     partitionGroupIdValue,
        };
        LOG_DEBUG('BIND response attributes:', attrs);

        Object.assign(consumer, {
          accessType:           attrs.accessType,
          queueDiscardBehavior: attrs.discardBehavior,
          deliveryCountSent:    attrs.deliveryCountSent,
          endpointId:           attrs.endpointId,
          respectsTTL:          attrs.respectsTTL,
          flowId:               attrs.flowId,
          permissions:          attrs.grantedPermissions,
          wantFlowChangeNotify: attrs.wantFlowChangeNotify,
          endpointErrorId:      attrs.endpointErrorId,
          spoolerUniqueId:      attrs.spoolerUniqueId,
          partitionGroupId:     partitionGroupIdValue,
        });
        if (fsm._sessionInterface.isCapable(CapabilityType.BR_REPLAY_ERRORID)) {
          consumer.endpointErrorId = attrs.endpointErrorId;
        }

        if (attrs.topicEndpointBytes && attrs.topicEndpointBytes.length) {
          attrs.endpoint = DestinationFromNetwork.createDestinationFromBytes(
            attrs.topicEndpointBytes);
          LOG_DEBUG('Overwriting local endpoint:', fsm._endpoint, 'with remote:', attrs.endpoint);
          fsm._endpoint = attrs.endpoint;
          const consumerProperties = fsm._consumer._properties;
          consumerProperties.queueDescriptor = new QueueDescriptor({
            name:    attrs.endpoint.name,
            type:    consumerProperties.queueDescriptor.type,
            durable: consumerProperties.queueDescriptor.durable,
          });
        }

        const consumerProperties = fsm._consumer._properties;
        consumerProperties.queueProperties = new QueueProperties({
          respectsTTL:          attrs.respectsTTL,
          permissions:          attrs.allOthersPermissions,
          quotaMB:              attrs.quota,
          maxMessageSize:       attrs.maxMsgSize,
          discardBehavior:      attrs.discardBehavior,
          maxMessageRedelivery: attrs.maxRedelivery,
          accessType:           attrs.accessType,
        });
        if (!consumerProperties.queueProperties.permissions) {
          consumerProperties.queueProperties.permissions = QueuePermissions.NONE;
        }


        Object.assign(fsm, {
          _active:           attrs.activeFlow,
          _remoteWindowSize: attrs.maxUnackedMessages,
        });

        if (Long.UZERO.eq(fsm._transportAcks.lastAcked)) {
          fsm._transportAcks.lastAcked = attrs.lastMsgIdAcked || Long.UZERO;
        } else {
          LOG_DEBUG(`Retaining last acked/last received: ${fsm._transportAcks}`);
        }
        //return this.transitionTo(fsm.FlowUp);
        //return fsm.processEvent(new ConsumerFSMEvent({ name: ConsumerFSMEventNames.FLOW_UP }));
      },
      handleBindResponse(msg) {
        LOG_TRACE(`handleBindResponse called for message ${msg}`);
        if (msg.msgType !== SMFLib.SMFAdProtocolMessageType.BIND) {
          LOG_INFO(`Unexpected message type in bind response: ${SMFLib.SMFAdProtocolMessageType.describe(msg.msgType)}`);
          return fsm.processEvent(
            new ConsumerFSMEvent(
              { name: ConsumerFSMEventNames.FLOW_FAILED },
              new OperationError(`Unexpected bind response: ${
                                 SMFLib.SMFAdProtocolMessageType.describe(msg.msgType)}`,
                                 ErrorSubcode.PROTOTOCOL_ERROR)));
        }

        const header = msg.smfHeader;
        const responseCode = header.pm_respcode;

        if (responseCode === null) {
          // Drop message and increment stats. Flow will time out
          this._consumer.incStat(StatType.RX_DISCARD_SMF_UNKNOWN_ELEMENT);
          LOG_DEBUG(`Drop Open-Publisher-Flow Request message on sessionId 0x${
                    this._sessionInterface.sessionIdHex}`);
          return undefined;
        }

        if (responseCode !== 200) {
          const description = header.pm_respstr;
          const errorSubcode = ErrorResponseSubcodeMapper.getADErrorSubcode(responseCode,
                                                                            description);
          LOG_INFO('Flow failed (bind):', responseCode, description, ErrorSubcode.describe(errorSubcode));
          LOG_DEBUG('subcode:', errorSubcode);
          return fsm.processEvent(new ConsumerFSMEvent(
              { name: ConsumerFSMEventNames.FLOW_FAILED },
              new OperationError(description, errorSubcode, { responseCode })));
        }

        const respEvent = { name: ConsumerFSMEventNames.BIND_RESPONSE };
        return fsm.processEvent(new ConsumerFSMEvent(respEvent, msg));
      },
    };
    this.BindSent = new State({
      name:          ConsumerStateNames.BIND_SENT,
      parentContext: fsm,
    }, this.BindSentExtensions)
      .entry(function onEntry() {
        fsm._connectAttempts--;
        this.sendBindRequest();
        this.bindRequestTimer = Timer.newTimeout(properties.connectTimeoutInMsecs,
                                                 this.handleBindTimeout);
      })
      .reaction(ConsumerFSMEventNames.SESSION_DOWN, function onSessionDown() {
        return this.transitionTo(fsm.Unbound.AwaitSessionUp);
      })
      .reaction(ConsumerFSMEventNames.SESSION_DISCONNECT, function onSessionDown() {
        return this.transitionTo(fsm.Unbound.AwaitSessionUp,
                                 () => fsm._addEventToEmit(MessageConsumerEventName.DOWN));
      })
      .reaction(ConsumerFSMEventNames.FLOW_CLOSE, function onFlowClose() {
        return this.transitionTo(fsm.UnbindSent);
      })
      .reaction(ConsumerFSMEventNames.BIND_TIMEOUT, function onBindTimeout() {
        LOG_TRACE(`Bind timeout, connectAttempts left: ${fsm._connectAttempts}`);
        if (fsm._connectAttempts > 0) {
          return this.externalTransitionTo(fsm.BindSent);
        }
        return this.transitionTo(fsm.Unbound.AwaitFlowOpen,
                                 () => emitBindFailed(
                                    new OperationError('Bind failed due to timeout',
                                                       ErrorSubcode.TIMEOUT)));
      })
      .reaction(ConsumerFSMEventNames.FLOW_FAILED, function onFlowFailed(evt) {
        // Act like a newly created flow
        return this.transitionTo(fsm.Unbound.AwaitFlowOpen, () => emitBindFailed(evt.details));
      })
      .reaction(ConsumerFSMEventNames.BIND_RESPONSE, function onBindResponse(event) {
        this.handleExpectedBind(event.details);
        //return this;
        return this.transitionTo(fsm.FlowUp);
      })
      .reaction(ConsumerFSMEventNames.FLOW_UP, function onFlowUp() {
        return this.transitionTo(fsm.FlowUp);
      })
      .exit(function onExit() {
        this.cancelBindRequestTimer();
      });
    this.Reconnecting = new State({
      name:          ConsumerStateNames.RECONNECTING,
      parentContext: fsm,
    })
      .entry(function onEntry() {
        LOG_TRACE('RECONNECTING onEntry: emitting RECONNECTING, resetting attempt counters');
        LOG_TRACE(`this._errorCausingReconnect: ${this._errorCausingReconnect}`);
        LOG_TRACE(`fsm._errorCausingReconnect: ${fsm._errorCausingReconnect}`);
        consumer._emit(MessageConsumerEventName.RECONNECTING, fsm._errorCausingReconnect);
        fsm._connectAttempts = properties.connectAttempts;
        fsm.reconnectAttempts = properties.reconnectAttempts;
      })
      .initial(
        function onInitial() {
          return checkCapabilitiesChoicePointOnRebind(this, false);
          // return this.transitionTo(fsm.Reconnecting.RBindSent);
        })
      .reaction(ConsumerFSMEventNames.SESSION_DISCONNECT, function onSessionDown() {
        return this.transitionTo(fsm.Reconnecting.RAwaitSessionUp);
      })
      .reaction(ConsumerFSMEventNames.SESSION_DOWN, function onSessionDown() {
        return this.transitionTo(fsm.Reconnecting.RAwaitSessionUp);
      });
    this.Reconnecting.RAwaitSessionUp = new State({
      name:          ConsumerStateNames.RECONNECTING_AWAIT_SESSION_UP,
      parentContext: this.Reconnecting,
    })
      .reaction(ConsumerFSMEventNames.SESSION_UP, function onSessionUp() {
        fsm._connectAttempts = properties.connectAttempts;
        return checkCapabilitiesChoicePointOnRebind(this, false);
        // return this.transitionTo(fsm.Reconnecting.RBindSent);
      });
    this.Reconnecting.RBindSent = new State({
      name:          ConsumerStateNames.RECONNECTING_BIND_SENT,
      parentContext: this.Reconnecting,
    }, this.BindSentExtensions)
      .entry(function onEntry() {
        fsm._connectAttempts--;
        this.sendBindRequest();
        this.bindRequestTimer = Timer.newTimeout(properties.connectTimeoutInMsecs,
                                                 this.handleBindTimeout);
      })
      .reaction(ConsumerFSMEventNames.FLOW_CLOSE, function onFlowClose() {
        // TODO: This is probably wrong in the original state machine too:
        // flowid is either missing or stale. Keeping it the same as there for now.
        return this.transitionTo(fsm.UnbindSent);
      })
      .reaction(ConsumerFSMEventNames.BIND_TIMEOUT, function onBindTimeout() {
        LOG_TRACE(`Bind timeout during reconnect, connectAttempts left: ${fsm._connectAttempts}`);
        if (fsm._connectAttempts > 0) {
          return checkCapabilitiesChoicePointOnRebind(this, true);
          // return this.externalTransitionTo(fsm.Reconnecting.RBindSent);
        }
        // Running out of rebinds is a non rebindable failure, so break the reconect loop.
        LOG_TRACE('Running out of rebinds is a non rebindable failure, so break the reconect loop.');
        return this.transitionTo(fsm.Unbound.AwaitFlowOpen,
                                 () => emitBindFailed(
                                    new OperationError('Rebind failed due to timeout',
                                                       ErrorSubcode.TIMEOUT)));
      })
      .reaction(ConsumerFSMEventNames.FLOW_FAILED, function onFlowFailed(evt) {
        // These three reconnect-errors are "rebindable",
        // anything else breaks the reconnect loop.
        if (fsm.reconnectAttempts > 0 || fsm.reconnectAttempts === -1) {
          if (evt && evt.details && evt.details.subcode) {
            if (evt.details.subcode === ErrorSubcode.QUEUE_SHUTDOWN ||
              evt.details.subcode === ErrorSubcode.TOPIC_ENDPOINT_SHUTDOWN ||
              evt.details.subcode === ErrorSubcode.GM_UNAVAILABLE) {
              LOG_TRACE(`ReconnectAttempts left: ${fsm.reconnectAttempts} subcode: ${evt.details.subcode}, giving it another shot.`);
              return this.transitionTo(fsm.Reconnecting.RAwaitTimer);
            }
          } else {
            LOG_DEBUG(`Hmm, no subcode in FLOW_FAILED? ${evt}`);
          }
        } else {
          LOG_TRACE(`Ran out of reconnectAttempts: ${fsm.reconnectAttempts}`);
        }
        return this.transitionTo(fsm.Unbound.AwaitFlowOpen, () => emitRebindFailed(evt.details));
      })
      .reaction(ConsumerFSMEventNames.BIND_RESPONSE, function onBindResponse(event) {
        this.handleExpectedBind(event.details);
        //return this;
        return this.transitionTo(fsm.FlowUp,
                                 () => fsm._addEventToEmit(MessageConsumerEventName.RECONNECTED));
      })
      .reaction(ConsumerFSMEventNames.FLOW_UP, function onFlowUp() {
        return this.transitionTo(fsm.FlowUp,
                                 () => fsm._addEventToEmit(MessageConsumerEventName.RECONNECTED));
                                 //() => consumer._emit(MessageConsumerEventName.RECONNECTING));
      })
      .exit(function onExit() {
        this.cancelBindRequestTimer();
      });
    this.Reconnecting.RAwaitTimer = new State({
      name:          ConsumerStateNames.RECONNECTING_AWAIT_TIMER,
      parentContext: this.Reconnecting,
    }, {
      handleReconnectIntervalTimeout() {
        LOG_DEBUG('Reconnect interval timeout (expected)');
        fsm.processEvent(new ConsumerFSMEvent({
          name: ConsumerFSMEventNames.RECONNECT_INTERVAL_TIMEOUT }));
      },
      cancelReconnectIntervalTimer() {
        this.reconnectIntervalTimer.cancel();
      },

    }).entry(function onEntry() {
      if (fsm.reconnectAttempts > 0) {
        --fsm.reconnectAttempts;
      }
      LOG_TRACE(`Setting up timer for ${properties.reconnectIntervalInMsecs}. reconnectAttempts left: ${fsm.reconnectAttempts}`);
      this.reconnectIntervalTimer = Timer.newTimeout(properties.reconnectIntervalInMsecs,
                                                     this.handleReconnectIntervalTimeout);
    }).exit(function onExit() {
      this.cancelReconnectIntervalTimer();
    }).reaction(ConsumerFSMEventNames.RECONNECT_INTERVAL_TIMEOUT, function onTimeout() {
      fsm._connectAttempts = properties.connectAttempts;
      return checkCapabilitiesChoicePointOnRebind(this, false);
      // return this.transitionTo(fsm.Reconnecting.RBindSent);
    });

    const flowUpFSM = this.FlowUp = new State({
      name:          ConsumerStateNames.FLOW_UP,
      parentContext: fsm,
    })
      .initial(
        function onInitial() {
          return this.transitionTo(fsm._active === 0
            ? flowUpFSM.XferInactive
            : flowUpFSM.Xfer);
        }
      )
      .entry(() => {
        fsm._replayStartLocation = undefined; //SOL-12945: only replay once.
        // This is ugly, but I can't coerce the post event action queue to work with me:
        // RECONNECTING should suppress UP.
        if (!fsm._errorCausingReconnect) {
          LOG_TRACE('Emitting UP, because there is no errorCausingReconnect');
          fsm._addEventToEmit(MessageConsumerEventName.UP);
        } else {
          LOG_TRACE('Not emitting UP, because there is errorCausingReconnect');
          fsm._errorCausingReconnect = null;
        }
      })
      .reaction(ConsumerFSMEventNames.SESSION_DOWN, function onSessionDown() {
        return this.transitionTo(fsm.Unbound.AwaitSessionUp);
      })
      .reaction(ConsumerFSMEventNames.SESSION_DISCONNECT, function onSessionDisconnect() {
        return this.transitionTo(fsm.Unbound.AwaitSessionUp,
                                 () => fsm._addEventToEmit(MessageConsumerEventName.DOWN));
      })
      .reaction(ConsumerFSMEventNames.FLOW_CLOSE, function onFlowClose() {
        // Acks will be sent by UnbindSent
        return this.transitionTo(fsm.UnbindSent);
      })
      .reaction(ConsumerFSMEventNames.FLOW_UNBOUND, event =>
        // Acks will be sent by UnbindSent
        fsm.transitionToUnboundFromUp(
          properties,
          MessageConsumerEventName.DOWN_ERROR,
          event.details)
      );

    flowUpFSM.Xfer = new State({
      name:          ConsumerStateNames.FLOW_UP_XFER,
      parentContext: flowUpFSM,
    })
      .entry(() => {
        fsm._addEventToEmit(MessageConsumerEventName.ACTIVE);
        fsm._sendAcks(true);
        fsm._requestStartDispatch();
      })
      .exit(() => {
        fsm._addEventToEmit(MessageConsumerEventName.INACTIVE);
        fsm._requestStopDispatch();
      })
      // Here we send acks on disconnect.  Acks for the FLOW_CLOSE event are coupled to the sending
      // of the unbind.
      .reaction(ConsumerFSMEventNames.SESSION_DISCONNECT, function onSessionDown() {
        fsm._sendAcks(true);            // send acks, then...
        return this.eventUnhandled();   // let the parent state handle the remaining steps.
      });

    flowUpFSM.XferInactive = new State({
      name:          ConsumerStateNames.FLOW_UP_XFER_INACTIVE,
      parentContext: flowUpFSM,
    })
      .reaction(ConsumerFSMEventNames.FLOW_ACTIVE_IND, function onFlowActiveIndication() {
        return this.transitionTo(flowUpFSM.Xfer);
      });

    this.UnbindSent = new State({
      name:          ConsumerStateNames.UNBIND_SENT,
      parentContext: fsm,
    }, {

      sendUnbindRequest() {
        // Don't do this on entry to UNBIND, do it when the user
        // wants to unbind

        fsm._endpointClear();
        try {
          const correlationTag = sessionInterface.getCorrelationTag();
          const message = SMFLib.AdProtocolMessage.getCloseMessageConsumer(consumer.flowId,
                                                                           correlationTag);
          sessionInterface.sendControl(message);
          sessionInterface.enqueueRequest(
            correlationTag,
            () => this.handleUnbindTimeout(),
            properties.connectTimeoutInMsecs,
            null,
            response => this.handleUnbindResponse(response));
          LOG_INFO('Sent consumer unbind request with arguments',
                   {
                     flowId: consumer.flowId,
                     correlationTag,
                   });
        } catch (e) {
          LOG_INFO(`Exception in sendUnbindRequest while trying to send unbind request: ${e}`);
          LOG_DEBUG(`Session (${sessionInterface.getCurrentStateName()}) assumed lost before flow unbind could be sent. Considering this flow unbound.`);
          fsm.processEvent(new ConsumerFSMEvent({ name: ConsumerFSMEventNames.FLOW_UNBOUND }));
        }
      },

      handleUnbindTimeout() {
        LOG_INFO('Unbind timeout');
        return fsm.processEvent(
          new ConsumerFSMEvent({ name: ConsumerFSMEventNames.UNBIND_TIMEOUT })
        );
      },

      handleUnbindResponse(msg) {
        if (msg.msgType !== SMFLib.SMFAdProtocolMessageType.UNBIND) {
          LOG_INFO(`Unexpected message type in bind response: ${SMFLib.SMFAdProtocolMessageType.describe(msg.msgType)}`);
        }
        const responseCode = msg.smfHeader.pm_respcode;
        const description = msg.smfHeader.pm_respstr;
        const errorSubcode = ErrorResponseSubcodeMapper.getADErrorSubcode(responseCode,
                                                                          description);
        consumer.endpointErrorId = msg.getEndpointErrorId();
        LOG_INFO('Flow failed (unbind):', responseCode, description, ErrorSubcode.describe(errorSubcode));
        return fsm.processEvent(new ConsumerFSMEvent(
          { name: ConsumerFSMEventNames.FLOW_UNBOUND },
          new OperationError(description, errorSubcode, responseCode)));
      },

    })
      .entry(function onEntry() {
        this.sendUnbindRequest();
      })
      .reaction(ConsumerFSMEventNames.UNBIND_TIMEOUT, function onUnbindTimeout() {
        return this.externalTransitionTo(fsm.UnbindSent);
      })
      // The choice point will emit on this transition, not on exit, which is OK:
      // we don't want to repeatedly dispatch "unbound" when the unbinds are timing out
      .reaction(ConsumerFSMEventNames.FLOW_UNBOUND,
                () => fsm.transitionToUnbound(MessageConsumerEventName.DOWN));


    this.CreateSent = new State({
      name:          ConsumerStateNames.CREATE_SENT,
      parentContext: fsm,
    }, {
      sendCreateRequest() {
        const correlationTag = sessionInterface.getCorrelationTag();
        const message = SMFLib.AdProtocolMessage.getCreate(
          properties.queueDescriptor,
          properties.queueProperties,
          correlationTag);
        sessionInterface.sendControl(message);
        sessionInterface.enqueueRequest(
          correlationTag,
          this.handleCreateTimeout.bind(this),
          properties.connectTimeoutInMsecs,
          null,
          this.handleCreateResponse.bind(this));
      },
      handleCreateTimeout() {
        LOG_INFO('Create timeout');
        fsm.processEvent(new ConsumerFSMEvent({ name: ConsumerFSMEventNames.CREATE_TIMEOUT }));
      },
      handleCreateResponse(msg) {
        LOG_TRACE(`handleCreateResponse called for message ${msg}`);


        if (msg.msgType !== SMFLib.SMFAdProtocolMessageType.CREATE) {
          LOG_INFO(`Unexpected message type in create response: ${SMFLib.SMFAdProtocolMessageType.describe(msg.msgType)}`);
          return fsm.processEvent(
            new ConsumerFSMEvent(
              { name: ConsumerFSMEventNames.CREATE_FAILED },
              new OperationError(`Unexpected create response: ${
                                 SMFLib.SMFAdProtocolMessageType.describe(msg.msgType)}`,
                                 ErrorSubcode.PROTOTOCOL_ERROR)));
        }

        const header = msg.smfHeader;
        const responseCode = header.pm_respcode;

        if (responseCode !== 200) {
          const description = header.pm_respstr;
          const errorSubcode = ErrorResponseSubcodeMapper.getADErrorSubcode(responseCode,
                                                                            description);
          LOG_INFO('Endpoint create failed:', responseCode, description, ErrorSubcode.describe(errorSubcode));
          LOG_DEBUG('subcode:', errorSubcode);
          if (errorSubcode === ErrorSubcode.ENDPOINT_ALREADY_EXISTS) {
            LOG_TRACE('subcode recognized as ALREADY EXISTS:', errorSubcode);
            const respEvent = { name: ConsumerFSMEventNames.CREATE_SUCCESS };
            return fsm.processEvent(new ConsumerFSMEvent(respEvent, msg));
          }
          LOG_TRACE('subcode not recognized as ALREADY EXISTS:', errorSubcode);
          return fsm.processEvent(new ConsumerFSMEvent(
              { name: ConsumerFSMEventNames.CREATE_FAILED },
              new OperationError(description, errorSubcode, { responseCode })));
        }

        const respEvent = { name: ConsumerFSMEventNames.CREATE_SUCCESS };
        return fsm.processEvent(new ConsumerFSMEvent(respEvent, msg));
      },
    }
    )
      .entry(function onEntry() {
        //send create request
        this.sendCreateRequest();
      })
      .reaction(ConsumerFSMEventNames.CREATE_TIMEOUT, function onCreateTimeout() {
        LOG_DEBUG('Create timeout, just moving on to bind and hoping for the best.');
        return this.externalTransitionTo(fsm.BindSent);
      })
      .reaction(ConsumerFSMEventNames.CREATE_SUCCESS, function onCreateSuccess() {
        return this.externalTransitionTo(fsm.BindSent);
      })
      .reaction(ConsumerFSMEventNames.CREATE_FAILED, function onCreateFailed(evt) {
        // "queue exists" should be noticed in the response handler and emit CREATE_SUCCESS.:
        return this.transitionTo(fsm.Unbound.AwaitFlowOpen, () => emitBindFailed(evt.details));
      });
  }

  /**
   *
   * @param {solace.Message} message The message being accepted by the flow.
   * @returns {Boolean} True if the message was accepted.
   * @private
   */
  acceptMessage(message) {
    const { LOG_TRACE, LOG_DEBUG } = this.logger;
    const messageID = message.getGuaranteedMessageId();
    const idstr = messageID.toString();
    const consumer = this._consumer;

    if (!this._fsmDispatch) {
      // We're disconnecting. Drop the message.
      LOG_TRACE('Dropping message because this flow cannot acknowledge it');
      consumer.incStat(StatType.RX_DISCARD_NO_MATCHING_CONSUMER);
      return false;
    }
    //Booby trap getDeliveryCount() on the message if the flow doesn't support it:
    if (!consumer.deliveryCountSent) {
      message.setDeliveryCount(-1);
    }
    // SMF flow suid can be updated via data message if there is a
    // header parameter containing the suid. All subsequent data messages
    // without suid must have the stored Flow suid set on data message.
    //
    // Update flow with message suid if and only if there was an initialized
    // suid from the flow bind response.
    // There is a legacy router case where suid are passed opaquely through the
    // network of brokers on the message. In the case where a suid is present
    // on a message without a suid on the bind response this indicates the suid
    // is not an update to subsequent messages suid values. As the current broker
    // connected does not support suid generation or assignment.
    //
    // Note a spooler unique id is only set if the message SMF
    // contained the SpoolerUniqueId header parameter.
    // See decode for details.
    const messageSuid = message._getSpoolerUniqueId();
    if (RgmidFactory.INVALID_SUID.eq(messageSuid)) {
      // set the message suid value to stored consumer suid value
      message._setSpoolerUniqueId(consumer.spoolerUniqueId);
    } else if (consumer.spoolerUniqueId !== undefined
        && !RgmidFactory.INVALID_SUID.eq(consumer.spoolerUniqueId)) {
      // otherwise update the consumer with new suid value if consumer suid was initialized
      const consumerSuidStr = consumer.spoolerUniqueId.toString();
      const msgSuidStr = messageSuid.toString();
      LOG_DEBUG('Updating Consumer message spoolerUniqueId from ', consumerSuidStr,
                ' to ', msgSuidStr);
      consumer.spoolerUniqueId = messageSuid;
    } else if ((consumer.spoolerUniqueId === undefined
        || RgmidFactory.INVALID_SUID.eq(consumer.spoolerUniqueId))
        && !RgmidFactory.INVALID_SUID.eq(messageSuid)) {
      // clear the message spooler unique id on flows without initial spooler
      // unique id as rmid is not supported
      message._setSpoolerUniqueId(RgmidFactory.INVALID_SUID);
    }

    const transportAcks = this._transportAcks;
    const ackResult = transportAcks.tryReceive(messageID,
                                               message.getGuaranteedPreviousMessageId());
    const transportAckRequired = transportAcks.acksPending > this.maxPendingAcks;
    switch (ackResult) {
      case TransportAckResult.OK:
        break; // carry on
      case TransportAckResult.DUPLICATE:
        consumer.incStat(StatType.RX_DISCARD_DUPLICATE);
        // Two options here: could check membership in the ringbuffer,
        // or could check whether the ID is less than ringbuffer.front().
        // Untested assumption: arithmetic comparison on boxed Long at front()
        // is more computationally expensive than native Map() lookup.
        if (!this._applicationAcks.has(messageID) &&
            !this._oldUnacked.has(idstr)) {
          LOG_TRACE('Will application ack unknown duplicate ID', idstr);
          const dupAckRanges = new Map();
          dupAckRanges.set(MessageOutcome.ACCEPTED, [[messageID, messageID]]);
          this._sendAck(dupAckRanges); // send ack for duplicates as ACCEPTED outcome (broker would use previous settlement outcome for message)
        } else if (transportAckRequired) {
          this._sendAcks(transportAckRequired);
        } else {
          // if we are receiving dups we should make sure we send
          // and ack in case we don't receive any non duplicates
          // and we don't receive enough duplicates to exceed
          // maxPendingAcks.  Eventually we would but only after
          // the  router retransmitted many times.
          this._setTransportAckTimer();
        }
        return false;
      case TransportAckResult.OUT_OF_ORDER:
        consumer.incStat(StatType.RX_DISCARD_OUT_OF_ORDER);
        return false;
      default:
        assert(false, 'Unhandled transport ack result', ackResult);
        return false;
    }

    return this._applicationAcks.insert(messageID, (evicting) => {
      let applicationAckRequired = false;
      if (evicting) {
        switch (evicting.state) {
          case ApplicationAckState.UNACKED:
            this._oldUnacked.add(evicting.key);
            break;
          case ApplicationAckState.ACKED_NOT_SENT:
            applicationAckRequired = true;
            break;
          case ApplicationAckState.ACKED_SENT:
            // OK to evict
            break;
          default:
            assert(false,
                   'Unhandled application ack state',
                   ApplicationAckState.describe(evicting.state));
        }
      }

      this._midDispatch = true;
      this._messageDispatch.push(message);
      this._midDispatch = false;

      if (transportAckRequired || applicationAckRequired) {
        LOG_TRACE('Need to send acks:',
                  'transport', transportAckRequired,
                  'application', applicationAckRequired);
        this._sendAcks(transportAckRequired);
      } else {
        this._setTransportAckTimer();
      }

      return true;
    });
  }

  applicationAck(messageId, isAutoAcked = false) {
    // internally call the applicationSettle(messageId, MessageOutcome.ACCEPTED); 
    // since the implementations are the same
    this.applicationSettle(messageId, MessageOutcome.ACCEPTED, isAutoAcked);
  }

  applicationSettle(messageId, messageOutcome = MessageOutcome.ACCEPTED, isAutoAcked = false) {
    const { LOG_TRACE } = this.logger;

    const idstr = messageId.toString();

    // increment the correct stat for the outcome
    switch(messageOutcome) {
      case MessageOutcome.FAILED:
        LOG_TRACE('Settling the message with outcome as solace.MessageOutcome.FAILED');
        // count persistent and non-persistent settled failed msgs
        this._consumer.incStat(StatType.RX_SETTLE_FAILED);
        break;
      // case MessageOutcome.RELEASED:
      //   // no current support for RELEASED settlement Outcome
      //   break;
      case MessageOutcome.REJECTED:
        LOG_TRACE('Settling the message with outcome as solace.MessageOutcome.REJECTED');
        // count persistent and non-persistent settled rejected msgs
        this._consumer.incStat(StatType.RX_SETTLE_REJECTED);
        break;

      case MessageOutcome.ACCEPTED:
         // count persistent and non-persistent settled accepted msgs
        LOG_TRACE('Settling the message with outcome as solace.MessageOutcome.ACCEPTED');
        // count for manual calls on Message.acknowledge(); or Message.settle(outcome); methods
        // so that we don't count for auto-Acks
        if(!isAutoAcked) {
          this._consumer.incStat(StatType.RX_SETTLE_ACCEPTED); // count persistent and non-persistent settled accepted msgs
        }
        // increment the Acked stat for calls to accept() and auto-acks
        this._consumer.incStat(StatType.RX_ACKED); // count it as normal acknowledgement
        break;
    }

    // Was the message old, and demoted to _oldUnacked?
    if (this._oldUnacked.delete(idstr)) { // True if idstr was member
      // It was old. Ack immediatesly.
      LOG_TRACE('Application acking old message immediately');
      const oldAckRanges = new Map();
      oldAckRanges.set(messageOutcome, [[messageId, messageId]]);
      this._sendAck(oldAckRanges); // send settlement outcome for message
      return;
    }

    // We can't regress the ack_state this way; message.acknowledge() throws if called
    // more than once.
    this._applicationAcks.updateAckState(messageId, ApplicationAckState.ACKED_NOT_SENT, messageOutcome);
    // if the message outcome is not ACCEPTED, then flush the buffer immediately
    if(messageOutcome !== MessageOutcome.ACCEPTED) {
      this._sendAcks(true); // send the accumulated Ack(s) and Nacks immediately
    } else {
      this._setTransportAckTimer();
    }
  }

  getDestination() {
    this._endpointEnsure();
    return this._destination;
  }

  isDisconnected() {
    if (!this.getCurrentState()) return true;
    return (
      this.getActiveState(ConsumerStateNames.UNBOUND) ||
      this.getActiveState(ConsumerStateNames.UNBOUND_AWAITING_FLOWOPEN)
    );
  }

  requestStartDispatchUser() {
    this._userDispatch = true;
    this.applyStartDispatch();
  }

  requestStartDispatchFSM() {
    this._fsmDispatch = true;
    this.applyStartDispatch();
  }

  applyStartDispatch() {
    if (this._userDispatch && this._fsmDispatch) {
      this.log(`Starting message dispatch (fsm ${this._fsmDispatch}, user ${this._userDispatch})`);
      this._messageDispatch.start();
      this._localMaxWindowSize = this._localPreferredWindowSize;
      this._sendAcks(true);
    } else {
      this.log(`Not starting message dispatch (fsm ${this._fsmDispatch}, user ${this._userDispatch})`);
    }
  }

  transitionToUnbound(eventName, error) {
    const consumer = this._consumer;
    const { LOG_TRACE } = this.logger;

    LOG_TRACE('Flow down, user disconnected?', consumer.userDisconnected);

    this._clearPartitionGroupId(); // clear the partitionGroupId

    return this.transitionTo(this.Unbound.AwaitFlowOpen, () =>
      this._addEventToEmit(eventName, error));
  }

  transitionToUnboundFromUp(properties, eventName, error) {
    const consumer = this._consumer;
    const { LOG_TRACE } = this.logger;
    LOG_TRACE('Choice point for FLOW_UNBOUND in FlowUp state.');
    // Always clear duplicate filter state on "Replay started"
    // (no way to maintain it and drop the replayed messages as duplicates)
    if (error && error instanceof OperationError && error.subcode) {
      if (error.subcode === ErrorSubcode.REPLAY_STARTED) {
        // "this" is the fsm.
        this._transportAcks.reset();
        this._applicationAcks.reset();
      }
    }

    this._clearPartitionGroupId(); // clear the partitionGroupId

    if (consumer.endpointErrorId &&
      this._sessionInterface.isCapable(CapabilityType.MESSAGE_REPLAY)) {
      LOG_TRACE(`Acking unsolicited unbind with endpointErrorId ${consumer.endpointErrorId}`);
      const unbindAck = SMFLib.AdProtocolMessage.getUnbindAck(consumer._flowId,
                                                              consumer.endpointErrorId,
                                                              this._transportAcks.lastAcked);
      this._sessionInterface.sendControl(unbindAck); // Must succeed or throw
    }
    if (properties.reconnectAttempts === -1 || properties.reconnectAttempts > 0) {
      if (error && error instanceof OperationError && error.subcode) {
        if (error.subcode === ErrorSubcode.REPLAY_STARTED ||
          error.subcode === ErrorSubcode.GM_UNAVAILABLE) {
          LOG_TRACE(`Saving errorCausingReconnect: ${error}`);
          this._errorCausingReconnect = error;
          // Reconnecting state emits the RECONNECTING event.
          return this.transitionTo(this.Reconnecting);
        }
      }
    } //else: flow reconnect feature disabled, or not applicable.
    return this.transitionTo(this.Unbound.AwaitFlowOpen, () =>
      this._addEventToEmit(eventName, error));
  }

  requestStopDispatchUser() {
    this._userDispatch = false;
    this.log(`Stop dispatch user (fsm ${this._fsmDispatch}, user ${this._userDispatch})`);
    this._messageDispatch.stop();
  }

  requestStopDispatchFSM() {
    this._fsmDispatch = false;
    this.log(`Stop dispatch FSM (fsm ${this._fsmDispatch}, user ${this._userDispatch})`);
    this._sendAcks(true);
  }

  _clearTransportAckTimer() {
    if (!this._transportAckTimer) return;

    clearTimeout(this._transportAckTimer);
    this._transportAckTimer = null;
  }

  _dispose() {
    this._clearTransportAckTimer();
    this._endpointClear();
    this._destination = undefined;
    this._unacked = null;
    this._messageDispatch = null;
    this._transportAcks = null;
    this._consumer = null;
    this._sessionInterface = null;
  }

  _endpointClear() {
    this._endpoint = undefined;
    this._subscription = undefined;
  }

  _endpointEnsure() {
    if (this._endpoint) {
      // The endpoint, destination and any possible topic subscription are generated
      // at the same time. Use the presence of the endpoint to determine whether
      // any of this has already been done.
      return;
    }

    const sessionInterface = this._sessionInterface;
    const properties = this._consumer._properties;
    const { queueDescriptor } = properties;

    let destination;
    let endpoint;
    let subscription;
    if (queueDescriptor.type === QueueType.QUEUE) {
      // The publish destination needs a prefix. Create a destination from
      // the descriptor, then derive the endpoint name from that.
      destination = sessionInterface.createDestinationFromDescriptor(queueDescriptor);

      // The bind target is the queue name encoded as though a topic -- no prefix
      // Use the offset information to build a bind target
      endpoint = new Queue({
        name:   destination.name,
        type:   DestinationType.QUEUE,
        offset: 0,
        bytes:  destination.bytes.substr(destination.offset),
      });

      subscription = undefined;
    } else {
      // QueueType.TOPIC_ENDPOINT
      endpoint = queueDescriptor.name
        ? sessionInterface.createDestinationFromDescriptor(queueDescriptor)
        : new Topic({ name: '\0?', offset: 0, bytes: '\0' });
      subscription = properties.topicEndpointSubscription ||
        sessionInterface.createTemporaryDestination(DestinationType.TOPIC);
      destination = subscription;
    }

    // Using Object.assign to enforce the invariant that these three properties are set together
    // or not at all.
    Object.assign(this, {
      _destination:  destination,
      _endpoint:     endpoint,
      _subscription: subscription,
    });

    // Update the properties object (if TTMP, we may modify it again)
    properties.queueDescriptor = new QueueDescriptor({
      name:    endpoint.name,
      type:    queueDescriptor.type,
      durable: queueDescriptor.durable,
    });
  }

  _resetLocalConnectionState() {
    Object.assign(this, {
      _remoteWindowSize: 0,
      _active:           undefined,
      _fsmDispatch:      false,
      _userDispatch:     true, // User flow is initially started
    });
  }

  _clearPartitionGroupId() {
    const { LOG_TRACE } = this.logger;

    LOG_TRACE('Clear the consumer.partitionGroupId; Set it to undefined');
    this._consumer.partitionGroupId = undefined;
  }

  _resetRemoteConnectionState() {
    const { LOG_TRACE } = this.logger;

    LOG_TRACE('Initializing transport acks');
    this._transportAcks = new TransportAcks();

    LOG_TRACE('Initializing application acks');
    this._applicationAcks = new ApplicationAckRingBuffer(RING_BUFFER_SIZE);
    this._oldUnacked = new Set();

    LOG_TRACE('Initializing endpointErrorId');
    this._consumer.endpointErrorId = undefined;

    LOG_TRACE('Initializing partitionGroupId');
    this._consumer.partitionGroupId = undefined;
  }

  /**
   * Sends a single ack.
   *
   * @param {Map.<Array.<ackpair>>} applicationAcks The application ack ranges to send
   * @memberof ConsumerFSM
   * @private
   */
  _sendAck(applicationAcks) {
    const transportAck = SMFLib.AdProtocolMessage.getAck(
      this._consumer.flowId,
      this._transportAcks.lastReceived,
      this.windowSize,
      applicationAcks);
    this._sessionInterface.sendControl(transportAck); // Must succeed or throw
  }

  /**
   * Adds the current ack ranges to an accumulator. Call this for each applicationAck.
   * When a range ends (we receive an unacked), or we are explictly flushing
   * (applicationAck === null), we convert the accumulator to acks, send them, update state,
   * and reset accumulators.
   *
   * This should be called at least once with the last invocation having ackRange = null.
   * This invokes the flush path, which is necessary if any ranges were in progress,
   * and this is the only path that repects the forceTransportAck flag.
   *
   * @param {Object} acksPendingState State associated with this activity
   * @param {ApplicationAck} applicationAck The ack to process for ack ranges.
   *    If `null`, any pending acks are flushed and a transport ack is sent, if
   *    required.
   * @private
   */
  _addAckToRanges(acksPendingState, applicationAck = null) {
    // they will hold an Array of Arrays for each supported settlement outcomes (i.e. indexes -> 0,1,2,3)
    const currentRange = acksPendingState.currentRange;
    const ackRanges = acksPendingState.ackRanges;

    // All message settlement outcomes
    const allMessageOutcomes = MessageOutcome.values;

    const currentRangeLength = currentRange.length;
    if (applicationAck && applicationAck.state !== ApplicationAckState.UNACKED) {
      // check for the type of settlement outcome and push if same as aggregation group
      if(
        currentRangeLength === 0 ||
        ((currentRangeLength > 0)
        && currentRange[currentRangeLength - 1].settlementOutcome === applicationAck.settlementOutcome)
      ) {
        currentRange.push(applicationAck);
        return; // Wait for end of range or flush
      }
      // if settlement outcomes were flushed and applicationAck still remaining, start another aggregation group
      else if(
        ((currentRangeLength > 0)
        && currentRange[currentRangeLength - 1].settlementOutcome !== applicationAck.settlementOutcome)
      ) {
        const rangeOutcome = currentRange[currentRangeLength - 1].settlementOutcome;
        ackRanges[rangeOutcome].push(currentRange); // flush the range since we detect a change in outcomes

        acksPendingState.currentRange = []; // Clear the accumulators for this range.
        acksPendingState.currentRange.push(applicationAck); // push the detected change
        return; // Wait for end of range or flush
      }
    }

    // We received an UNACKED or a null ack (flush). Complete this range for each of the settlement outcomes.
    if (currentRangeLength) {
      const rangeOutcome = currentRange[currentRangeLength - 1].settlementOutcome;
      ackRanges[rangeOutcome].push(currentRange);
    }

    // If we're flushing or we've hit the protocol limit for ranges in a single ack,
    // we send ack messages, update ack states and clear the accumulator.
    // This condition will be true at least once (must flush at the end).

    // aggregate the total length of the ack/nack ranges
    let totalRangeLen  = 0;
    for(let i = 0; i < allMessageOutcomes.length; i ++) {
      totalRangeLen += ackRanges[allMessageOutcomes[i]].length;
    }

    if (applicationAck === null || (totalRangeLen === SMFLib.AdProtocolMessage.MAX_CLIENT_ACK_RANGES)) {
      // Reduce each range for each of the settlement outcomes to a first and last ID.
      const bareRanges = new Map(); // ackRanges.map(range => [range[0].id, range[range.length - 1].id]);
      let hasAnyBareRanges = false;

      for(let i = 0; i < allMessageOutcomes.length; i ++) {
        // check for value and only put real values into map
        if(ackRanges[allMessageOutcomes[i]].length > 0) {
          bareRanges.set(allMessageOutcomes[i], ackRanges[allMessageOutcomes[i]].map(
            range => [range[0].id, range[range.length - 1].id]
          ));
          // we have written at least one outcome ack/nack range into the map
          hasAnyBareRanges = true;
        }
      }

      if (hasAnyBareRanges || acksPendingState.forceTransportAck) {
        // We send the ranges. This could throw.
        const { LOG_TRACE } = this.logger;

        this._sendAck(bareRanges); // now contains a Map of Arrays to pass settlement outcomes to Broker

        // expectation is only ONE NACK before we flush the (N)acks in the RingBuffer to the broker
        const failedBareRanges = bareRanges.get(MessageOutcome.FAILED);
        const rejectedBareRanges = bareRanges.get(MessageOutcome.REJECTED);

        // Warning: this log statement reduces debug performance by about 25%!
        LOG_TRACE(
          'Sent ack: ',
          `Transport ack: ${this._transportAcks.lastReceived}`,
          `Application acks(ACCEPTED): ${bareRanges.has(MessageOutcome.ACCEPTED)
            ? bareRanges.get(MessageOutcome.ACCEPTED).map(g => `[${g[0]}..${g[1]}]`)
            : '[]'}\n`,
          `Application Nacks(FAILED): ${bareRanges.has(MessageOutcome.FAILED)
            ? `[${failedBareRanges[0][0]}..${failedBareRanges[0][1]}]`
            : '[]'}\n`,
          `Application Nacks(REJECTED): ${bareRanges.has(MessageOutcome.REJECTED)
            ? `[${rejectedBareRanges[0][0]}..${rejectedBareRanges[0][1]}]`
            : '[]'}`
        );
        // If the intent was to force a transport ack, that condition is fulfilled.
        this._transportAcks.setAcked();
        acksPendingState.forceTransportAck = false;
      }

      // Since the range send succeeded, update the ack states.
      for(let i = 0; i < allMessageOutcomes.length; i ++) {
        ackRanges[allMessageOutcomes[i]].forEach((ackRange) => {
          ackRange.forEach((ack) => {
            if (ack.state !== ApplicationAckState.ACKED_SENT) {
              try {
                this._applicationAcks.updateAckState(ack.id,
                                                     ApplicationAckState.ACKED_SENT);
              } catch (e) {
                const { LOG_ERROR } = this.logger;
                LOG_ERROR(`Marking ack ${ack.id} as sent failed: ${e}`);
              }
            }
          });
        });
      }

      // Clear the main accumulators since it has hit the wire.
      acksPendingState.ackRanges = [];
      for(let i = 0; i < allMessageOutcomes.length; i ++) {
        acksPendingState.ackRanges[allMessageOutcomes[i]] = [];
      }
    }
    // Clear the accumulators for this range.
    acksPendingState.currentRange = [];
  }

  /**
   * Sends acknowledgements, along with a window update for this flow.
   *
   * @param {Boolean} forceTransportAck `true` sends transport ack even if no acks pending
   * @private
   */
  _sendAcks(forceTransportAck = false) {
    this._clearTransportAckTimer();

    const applicationAcks = this._applicationAcks;
    const transportAcks = this._transportAcks;
    // All message settlement outcomes
    const allMessageOutcomes = MessageOutcome.values;

    // This state is temporary and shared with the ack generating function.
    const acksPendingState = {
      forceTransportAck: forceTransportAck || (transportAcks.acksPending > 0),
      ackRanges:         [], // holds an Array of Arrays for each supported settlement outcomes (i.e. indexes -> 0,1,2,3)
      currentRange:      [], // holds an Array for each supported settlement outcomes per aggregation (i.e. indexes -> 0,1,2,3)
    };

    // initialize the ackRanges with Array of Arrays
    for(let i = 0; i < allMessageOutcomes.length; i ++) {
      acksPendingState.ackRanges[allMessageOutcomes[i]] = [];
    }

    // If the first ringbuffer ack state is acked and there are no unacked before this,
    // we can create an ack range that includes zero.
    // If the first ringbuffer ack state is unacked, we can say nothing about the
    // range before the first acked ID in the ringbuffer.
    const front = this._applicationAcks.front();
    if (this._oldUnacked.size === 0 && front && front.state !== ApplicationAckState.UNACKED) {
      // State is acked_sent so we don't try to update the ring buffer.
      this._addAckToRanges(acksPendingState, ZERO_APP_ACK);
      // We know that front will be added to this range by the iteration.
    }

    // Build ack ranges using the accumulator
    applicationAcks.forEach(ack => this._addAckToRanges(acksPendingState, ack));
    // Flush the last pending ack range.
    this._addAckToRanges(acksPendingState);

    // On successful exit, we've sent and cleared all accumulators, and we've sent
    // a transport ack if we were supposed to do that.
    assert(acksPendingState.forceTransportAck === false);
    assert(acksPendingState.currentRange.length === 0);
    for(let i = 0; i < allMessageOutcomes.length; i ++) {
      assert(acksPendingState.ackRanges[allMessageOutcomes[i]].length === 0);
    }
  }

  _setTransportAckTimer() {
    if (this._transportAckTimer) return;
    if (this._consumer.disposed) return;
    this._transportAckTimer = setTimeout(
      () => this._sendAcks(true),
      this._acknowledgeTimeoutInMsecs
    );
  }

  get maxWindowSize() {
    return Math.min(this._localMaxWindowSize,
                    this._remoteWindowSize || Number.POSITIVE_INFINITY);
  }

  get windowSize() {
    return this.maxWindowSize - this._messageDispatch.length;
  }

  get maxPendingAcks() {
    return this.windowSize * this._acknowledgeThreshold / 100.0;
  }

  get hasAutoAckSupport() {
    return this._hasAutoAckSupport;
  }

}

module.exports.ConsumerFSM = ConsumerFSM;
