// @ts-ignore

import { context } from '@opentelemetry/api'
import debug from 'debug'
import IsomorphicWebSocket from 'isomorphic-ws'
import ReconnectingWebSocket from 'reconnecting-websocket'
import { errorToPlainJson, getFromMapOrCreate, newRxError, rxStorageInstanceToReplicationHandler, toArray, type RxDatabase, type RxReplicationHandler, type StringKeys } from 'rxdb'
import { BehaviorSubject, Subject } from 'rxjs'

import { posSync0 } from '@/data/PosSyncState.ts'
import type { ClientCommand } from '@/lib/client-caller/ClientCommand.types.ts'
import { getMasterHandler } from '@/lib/master-caller/master-handler.ts'
import { createSpan, runWithCurrentContext, updateConnectionStatus } from '@/lib/open-telemetry.ts'
import { getDeviceId } from '@/shared/getDeviceId'

type WebsocketMessageType = {
  clientId: string
  _id: string
  collection?: string
  collections?: string[]
  method: StringKeys<RxReplicationHandler<any, any>>
  params: any[]
  isMasterCall?: boolean
}

type WebsocketMessageResponseType = {
  clientId: string
  _id: string
  collection?: string
  collections?: string[]
  result: any
}

const log = debug('sync:offline-master')

let globalWsClient: any
const cacheHandler: Record<string, any> = {}
const clientIds: Record<string, any> = {}
let pingLock = false
let pingResolver: any = null
let pingRejecter: any = null

export function destroyGlobalWsMaster() {
  if (globalWsClient) {
    globalWsClient.socket.close()
  }
}

export function broadcastCommand(command: ClientCommand) {
  if (!globalWsClient) {
    throw new Error('No ws')
  }
  Object.keys(clientIds).forEach(clientId => {
    globalWsClient.socket.send(
      JSON.stringify({
        _id: 'command',
        clientId,
        ...command,
      })
    )
  })
}

export async function createWebSocketClient(url: string) {
  if (globalWsClient) {
    return globalWsClient
  }
  console.log('Creating ws')
  const wsClient = new ReconnectingWebSocket(url, [], {
    WebSocket: IsomorphicWebSocket,
  })

  const connected$ = new BehaviorSubject<boolean>(false)
  await new Promise<void>(res => {
    wsClient.onopen = () => {
      log('onopen master ws')
      wsClient.send(
        JSON.stringify({
          message: 'registerEvent',
          type: 'registerMaster',
        })
      )
      updateConnectionStatus('local-ws', true)
      connected$.next(true)
      setTimeout(() => {
        ping()
      }, 25000)
      res()
    }
  })
  wsClient.onclose = () => {
    log('onclose master ws')
    updateConnectionStatus('local-ws', false)
    connected$.next(false)
  }

  const message$ = new Subject<any>()
  wsClient.onmessage = messageObj => {
    if (messageObj.data === 'pong') {
      pingResolver && pingResolver()
      return
    }
    // console.log('message is', messageObj)
    const message = JSON.parse(messageObj.data)
    message$.next(message)
  }

  const error$ = new Subject<any>()
  wsClient.onerror = err => {
    const emitError = newRxError('RC_STREAM', {
      errors: toArray(err).map((er: any) => errorToPlainJson(er)),
      direction: 'pull',
    })
    error$.next(emitError)
  }

  globalWsClient = {
    url,
    socket: wsClient,
    connected$,
    message$,
    error$,
  }
  return globalWsClient
}

async function ping() {
  if (pingLock) return
  pingLock = true
  if (globalWsClient) {
    const timeout = setTimeout(() => {
      log('Ping timeout')
      pingRejecter && pingRejecter()
      pingResolver = null
      globalWsClient.socket?.reconnect()
    }, 6900)
    globalWsClient.socket.send(JSON.stringify({ ping: true }))
    try {
      await new Promise((resolve, reject) => {
        pingResolver = resolve
        pingRejecter = reject
      })
      clearTimeout(timeout)
      setTimeout(() => {
        pingLock = false
        ping()
      }, 6900)
    } catch (err) {
      pingLock = false
    }
  }
}

export async function connectMasterToSocketServer(options: any) {
  const { databases } = options

  const websocketClient = await createWebSocketClient(options.url)
  const wsClient = websocketClient.socket
  const message$ = websocketClient.message$

  const replicationHandlerByCollection: Map<string, RxReplicationHandler<any, any>> = new Map()

  function getReplicationHandler(collectionName: string): RxReplicationHandler<any, any> | null {
    if (cacheHandler[collectionName]) {
      return cacheHandler[collectionName]
    }

    let foundHandler = null

    databases.forEach((database: RxDatabase) => {
      if (database.collections[collectionName]) {
        foundHandler = getFromMapOrCreate(replicationHandlerByCollection, collectionName, () => {
          const collection = database.collections[collectionName]
          return rxStorageInstanceToReplicationHandler(collection.storageInstance, collection.conflictHandler, database.token)
        })
      }
    })

    cacheHandler[collectionName] = foundHandler

    return foundHandler
  }

  const onCloseHandlers: Map<string, Function> = new Map()
  websocketClient.connected$.subscribe((isConnected: boolean) => {
    if (!isConnected) {
      onCloseHandlers.forEach((fn: Function) => {
        fn()
      })
      onCloseHandlers.clear()
    }
  })
  message$.subscribe(async (message: WebsocketMessageType) => {
    const clientId = message.clientId

    if (!message?.method) return
    if (message?.method?.toString() === 'unsubscribe') {
      const unsubFn: Function | undefined = onCloseHandlers.get(message.clientId)
      if (!!unsubFn) unsubFn()
      onCloseHandlers.delete(message.clientId)
      return
    }

    if (message.isMasterCall) {
      const span = createSpan('Handle master command', undefined, context.active())
      span?.setAttribute('storeId', `${posSync0()?.id!}`)
      span?.setAttribute('deviceId', getDeviceId())
      span?.setAttribute('message', JSON.stringify(message))
      const masterHandler = getMasterHandler()
      const result = await masterHandler.handleCommand(message.method, message.params)
      const response: WebsocketMessageResponseType = {
        clientId,
        _id: message._id,
        result,
      }
      span?.setAttribute('result', JSON.stringify(result))
      span?.end()
      wsClient.send(JSON.stringify(response))
      return
    }

    if (!!message.collection) {
      const handler = getReplicationHandler(message.collection)
      if (!handler) {
        return
      }
      const method = handler[message.method]
      const span = createSpan(`Master offline handler`, undefined, context.active())
      span?.setAttribute('storeId', `${posSync0()?.id!}`)
      span?.setAttribute('deviceId', getDeviceId())
      span?.setAttribute('message', JSON.stringify(message))
      return runWithCurrentContext(span, async () => {
        const result = await (method as any)(...message.params)
        const response: WebsocketMessageResponseType = {
          clientId,
          _id: message._id,
          collection: message.collection,
          result,
        }
        span?.setAttribute('masterOfflineResponse', JSON.stringify(response))
        wsClient.send(JSON.stringify(response))
        span?.end()
      })
    } else {
      if (!onCloseHandlers.has(clientId)) {
        clientIds[clientId] = true
        for (let collection of message.collections!) {
          const handler = getReplicationHandler(collection)
          if (!handler) {
            continue
          }
          const changeStreamSub = handler.masterChangeStream$.subscribe(ev => {
            const streamResponse: WebsocketMessageResponseType = {
              clientId,
              _id: 'stream',
              collection: collection,
              result: ev,
            }
            wsClient.send(JSON.stringify(streamResponse))
          })
          onCloseHandlers.set(clientId, () => changeStreamSub.unsubscribe())
        }
      }
      const response: WebsocketMessageResponseType = {
        result: message.params[0],
        clientId,
        _id: 'streamRegistered',
      }
      wsClient.send(JSON.stringify(response))
    }
  })

  const masterHandler = getMasterHandler()
  masterHandler.registerCommand('pingMaster', () => true)
}
