import { eventChannel } from "redux-saga";
import {
  all,
  put,
  takeEvery,
  fork,
  select,
  call,
  take,
} from "redux-saga/effects";
import Feed, { EventType } from "@dxfeed/api";
import moment from "moment";
import { wsUrl } from "../actions/helpers";
// Action Types
export const DXFEED_SOCKET_LISTEN = "DX/LISTEN/TOP";
export const DXFEED_SOCKET_SUCCESS = "DX/SUB/TOP/SUCCESS";
export const DXFEED_SOCKET_FAILURE = "DX/SUB/TOP/FAILURE";
let connected = false;
// Action Creators
export const dxFeedSocketListen = (
  symbols,
  options = false,
  disconnect = false,
  risk = false
) => ({
  type: DXFEED_SOCKET_LISTEN,
  symbols,
  options,
  disconnect,
  risk,
});
export const dxFeedSocketSuccess = (data, unsub = false) => ({
  type: DXFEED_SOCKET_SUCCESS,
  data,
  unsub,
});
export const dxFeedSocketFailure = (error) => ({
  type: DXFEED_SOCKET_FAILURE,
  error,
});
let feed;
let unsub;
let quoteSub;
let socket;
let i = 0;
let token;
let socketDataClone = new Map();
const connect = async (disconnect) => {
  if (!!feed || disconnect) {
    feed.disconnect();
    feed = null;
  }
  // if disconnect is false
  feed = new Feed();
  feed.connect(wsUrl);
  return new Promise((resolve) => {
    resolve(feed);
  });
};
const createSocketChannel = (feed, symbols, options, disconnect, risk) =>
  eventChannel((emit) => {
    if (!feed) {
      connect(false);
    }
    if (!!symbols.length || disconnect) {
      unsub && unsub();
      quoteSub && quoteSub();
      const eventTypeTimeSeries = [EventType.Candle];
      const eventTypeQuote = [EventType.Quote, EventType.TimeAndSale];
      const oneMinAgo = moment().subtract(1, "minute").valueOf() / 1000;
      const CandleSymbols = symbols.filter((i) => !i.includes("HSFX"))
      const quoteSymbols = symbols.map((i) =>
        i.includes("COMP") ? i.replace("{=1m,price=mark}", "") : i
      );
      unsub = feed.subscribeTimeSeries(
        eventTypeTimeSeries,
        CandleSymbols,
        oneMinAgo,
        (event) => {
          const { close, eventSymbol } = event;
          const ticker = eventSymbol.replace("{mm=COMP}{=1m,price=mark}", "")
          const hasPrice = !!socketDataClone?.get(ticker)?.bidPrice
          const payload = {
            ticker,
            price: close,
          };
          if (hasPrice) {
            payload.bidPrice = socketDataClone?.get(ticker)?.bidPrice
            payload.askPrice = socketDataClone?.get(ticker)?.askPrice
          }
          i % 50 === 0 && emit(payload);
          i++;
        }
      );
      quoteSub = feed.subscribe(
        eventTypeQuote,
        quoteSymbols,
        (event) => {
          let payload;
          const { eventSymbol, bidPrice, askPrice } = event;
          const ticker = eventSymbol.replace("{mm=COMP}", "").replace(":HSFX", "")
          const hasPrice = !!socketDataClone?.get(ticker)?.price
          payload = {
            ticker,
            bidPrice,
            askPrice,
          };
          if (hasPrice) {
            payload.price = socketDataClone?.get(ticker)?.price
          }
          socketDataClone.set(eventSymbol, payload)
          i % 2 === 0 && emit(payload);
          i++;
        }
      );
    }
    return () => {
      unsub();
      quoteSub();
    };
  });

// Sagas
function* listenSocket(action) {
  try {
    const { symbols, options, disconnect, risk } = action;
    if (!connected) {
      socket = yield call(() => connect(disconnect));
      connected = true;
    }
    let uniqueSubs = symbols.filter((symbol) => !symbol.includes("options"));
    const socketChannel = yield call(
      createSocketChannel,
      socket,
      uniqueSubs,
      options,
      disconnect,
      risk
    );
    while (true) {
      const { socketData } = yield select((state) => state.dxFeedSocket);
      const payload = yield take(socketChannel);
      socketData.set(payload.ticker, payload);
      socketDataClone = new Map(socketData)
      if (disconnect || !symbols.length) socketData.clear();
      const filteredPayload = {
        socketData,
        currentSubs: uniqueSubs,
      };
      yield put(dxFeedSocketSuccess(filteredPayload));
    }
  } catch (error) {
    yield put(dxFeedSocketFailure(error));
  }
}

function* listenSocketLoad() {
  yield takeEvery(DXFEED_SOCKET_LISTEN, listenSocket);
}

// Root Saga
export function* saga() {
  const listen = yield fork(listenSocketLoad);
  yield all([listen]);
}

const INIT_STATE = {
  currentSubs: [],
  socketData: new Map(),
};
// Reducer
const reducer = (state = INIT_STATE, action) => {
  switch (action.type) {
    case DXFEED_SOCKET_SUCCESS:
      return {
        ...state,
        currentSubs: action.data.currentSubs,
        socketData: new Map(action.data.socketData),
      };
    default:
      return state;
  }
};

export default reducer;