// import { replicateRxCollection, RxReplicationState } from '../replication';

// @ts-ignore

import { context, trace } from '@opentelemetry/api'
import debug from 'debug'
import IsomorphicWebSocket from 'isomorphic-ws'
import _ from 'lodash'
import ReconnectingWebSocket from 'reconnecting-websocket'
import { errorToPlainJson, getAssumedMasterState, newRxError, randomCouchString, RxError, toArray, writeDocToDocState, type RxReplicationWriteToMasterRow } from 'rxdb'
import { replicateRxCollection, RxReplicationState } from 'rxdb/plugins/replication'
import { deepEqual } from 'rxdb/plugins/utils'
import { BehaviorSubject, filter, firstValueFrom, map, Subject } from 'rxjs'
import uuid from 'time-uuid'
import type { ClientOptions } from 'ws'

import { masterSocketLock, setMasterAvailable } from '@/data/data-utils.ts'
import { posSync0 } from '@/data/PosSyncState.ts'
import type { MasterCommand } from '@/lib/master-caller/master-caller.types.ts'
import { createSpan, runWithCurrentContext, updateConnectionStatus } from '@/lib/open-telemetry.ts'
import { getDeviceId } from '@/shared/getDeviceId'
import { wakeUpEE } from '@/shared/wakeUpEE.ts'

export type WebsocketClientOptions<RxDocType> = {
  collection: any
  batchSize?: number
  live?: boolean
  replicationIdentifier?: string
  validation?: boolean
  dbName?: string
} & ClientOptions

export class RxReplicationStateClient<RxDocType, CheckpointType> extends RxReplicationState<RxDocType, CheckpointType> {
  public connected?: boolean = false
}

/**
 * Copied and adapter from the 'reconnecting-websocket' npm module.
 * Some bundlers have problems with bundling the isomorphic-ws plugin
 * so we directly check the correctness in RxDB to ensure that we can
 * throw a helpful error.
 */
export function ensureIsWebsocket(w: typeof IsomorphicWebSocket) {
  const is = typeof w !== 'undefined' && !!w && w.CLOSING === 2
  if (!is) {
    console.dir(w)
    throw new Error('websocket not valid')
  }
}

const log = debug('sync:offline-client')

let rxdbClient: RxdbClient

export async function getRxdbClient(collectionNames: string[] | null, url: string) {
  if (!rxdbClient) {
    if (!collectionNames) return null
    rxdbClient = new RxdbClient(collectionNames, url)
  }
  await rxdbClient.connectToMaster()
  return rxdbClient
}

export class RxdbClient {
  private globalWsClient: ReconnectingWebSocket | null = null
  private syncCollections: string[] = []
  private url: string
  private message$: Subject<any>
  private error$: Subject<any>
  private streamRequestTimeout: any = null
  private streamRequestId = 0
  private pingLock = false
  private pingResolver: any = null
  private pingRejecter: any = null

  public revalidateError$: Subject<{ document: any; assumedMasterState: any }>
  public connected$: BehaviorSubject<boolean>

  constructor(collectionNames: string[], url: string) {
    this.syncCollections = collectionNames
    this.url = url
    this.connected$ = new BehaviorSubject<boolean>(false)
    this.message$ = new Subject<any>()
    this.error$ = new Subject<any>()
    this.revalidateError$ = new Subject<any>()
  }

  async ping() {
    if (this.pingLock) return
    this.pingLock = true
    if (this.globalWsClient) {
      const timeout = setTimeout(() => {
        log('Ping timeout')
        this.pingRejecter && this.pingRejecter()
        this.pingResolver = null
        this.globalWsClient?.reconnect()
      }, 6900)
      this.globalWsClient.send(JSON.stringify({ ping: true }))
      try {
        await new Promise((resolve, reject) => {
          this.pingResolver = resolve
          this.pingRejecter = reject
        })
        clearTimeout(timeout)
        setTimeout(() => {
          this.pingLock = false
          this.ping()
        }, 6900)
      } catch (err) {
        this.pingLock = false
      }
    }
  }

  // async call
  async callMasterCommand(command: MasterCommand) {
    if (!this.globalWsClient) {
      throw new Error('No ws')
    }
    const span = createSpan('Calling master command offline', undefined, context.active())
    const requestId = uuid()
    span?.setAttribute('storeId', `${posSync0()?.id!}`)
    span?.setAttribute('deviceId', getDeviceId())
    span?.setAttribute('command', JSON.stringify({ requestId, ...command }))
    this.globalWsClient.send(JSON.stringify({ _id: requestId, ...command }))
    const timeout = setTimeout(() => {
      this.message$.next({
        _id: requestId,
        result: {
          timeout: true,
        },
      })
    }, 2000)
    const result = await firstValueFrom(
      this.message$.pipe(
        filter(msg => msg._id === requestId || msg._id === 'error'),
        map(msg => msg.result)
      )
    )
    if (result === 'error') {
      const err = new Error('Command Error')
      span?.recordException(err)
      span?.end()
      throw err
    }
    if (result?.timeout) {
      const err = new Error('Timeout')
      span?.recordException(err)
      span?.end()
      throw err
    }
    clearTimeout(timeout)
    span?.setAttribute('result', JSON.stringify(result))
    span?.end()
    return result
  }

  async connectToMaster() {
    if (this.globalWsClient) return this.globalWsClient
    ensureIsWebsocket(IsomorphicWebSocket)
    const wsClient = new ReconnectingWebSocket(this.url, [], {
      WebSocket: IsomorphicWebSocket,
      maxReconnectionDelay: 6000,
    })

    wsClient.onopen = () => {
      log('onopen client ws')
      wsClient.send(
        JSON.stringify({
          message: 'registerEvent',
          type: 'registerClient',
        })
      )
      updateConnectionStatus('local-ws', true)
      setTimeout(() => {
        this.ping()
      }, 6900)
    }

    wsClient.onclose = () => {
      updateConnectionStatus('local-ws', false)
      if (this.streamRequestTimeout) {
        clearTimeout(this.streamRequestTimeout)
        this.streamRequestTimeout = null
      }
      log('onclose client ws')
      this.connected$.next(false)
    }

    wsClient.onmessage = messageObj => {
      if (messageObj.data === 'pong') {
        this.pingResolver && this.pingResolver()
        return
      }
      const message = JSON.parse(messageObj.data)
      if (message.message === 'masterConnected') {
        this.streamRequestId++
        log('trigger connected')
        /**
         * When the client goes offline and online again,
         * we have to send an event to pause current message$ pipe
         * for pushing
         */
        this.message$.next({
          _id: 'error',
          result: 'error',
        })
        this.connected$.next(true)

        /**
         * Because reconnecting creates a new websocket-instance,
         * we have to start the changestream from the remote again
         * each time.
         */

        const sendStreamRequest = () => {
          if (this.streamRequestTimeout) return
          const streamRequest = {
            _id: 'stream',
            collections: this.syncCollections,
            method: 'masterChangeStream$',
            params: [this.streamRequestId],
          }
          wsClient.send(JSON.stringify(streamRequest))
          this.streamRequestTimeout = setTimeout(() => {
            this.streamRequestTimeout = null
            sendStreamRequest()
          }, 4000)
        }

        sendStreamRequest()
      } else if (message._id === 'streamRegistered') {
        if (message.result === this.streamRequestId) {
          if (this.streamRequestTimeout) {
            clearTimeout(this.streamRequestTimeout)
            this.streamRequestTimeout = null
          }
        }
      } else {
        this.message$.next(message)
      }
    }

    wsClient.onerror = err => {
      const emitError = newRxError('RC_STREAM', {
        errors: toArray(err).map((er: any) => errorToPlainJson(er)),
        direction: 'pull',
      })
      this.error$.next(emitError)
    }

    this.connected$.subscribe(isConnected => {
      log('is socket connected', isConnected)
      if (!isConnected) {
        log('setMasterAvailable false');
        // setMasterAvailable(false);
      } else {
        masterSocketLock.release().then();
      }
    })

    this.globalWsClient = wsClient
  }

  //@deprecated will be removed in the future
  async revalidateDoc(docIds: any[], state: RxReplicationStateClient<any, any>) {
    return Promise.all([state.internalReplicationState!.input.forkInstance.findDocumentsById(docIds, true), getAssumedMasterState(state.internalReplicationState!, docIds)]).then(
      ([currentForkStateList, assumedMasterState]) => {
        currentForkStateList.forEach(doc => {
          const assumedMaster = assumedMasterState[doc._id]
          if (!assumedMaster) {
            this.revalidateError$.next({
              document: doc,
              assumedMasterState: undefined,
            })
            return
          }
          if (!_.isEqual(writeDocToDocState(doc, false, false), writeDocToDocState(assumedMaster.docData, false, false))) {
            this.revalidateError$.next({
              document: doc,
              assumedMasterState: assumedMaster.docData,
            })
          }
        })
      }
    )
  }

  async replicateWithWebsocketServer<RxDocType, CheckpointType>(options: WebsocketClientOptions<RxDocType>): Promise<RxReplicationStateClient<RxDocType, CheckpointType>> {
    const rxClient = this
    const wsClient = this.globalWsClient
    const messages$ = this.message$
    const collectionName = options.dbName!
    const primaryPath = options.collection.schema.primaryPath

    let requestCounter = 0
    const requestFlag = randomCouchString(10)

    function getRequestId() {
      const count = requestCounter++
      return options.collection.database.token + '|' + requestFlag + '|' + count
    }

    const observable = messages$.pipe(
      filter(msg => msg._id === 'stream' && msg.collection === options.collection.name),
      map(msg => msg.result)
    )

    observable.subscribe({
      next: val => {
        replicationState.reSync()
      },
    })

    const replicationState: RxReplicationStateClient<RxDocType, CheckpointType> = replicateRxCollection<RxDocType, CheckpointType>({
      collection: options.collection,
      //@ts-ignore
      replicationIdentifier: options.replicationIdentifier,
      live: options.live,
      pull: {
        batchSize: options.batchSize,
        //@ts-ignore
        async handler(lastPulledCheckpoint: CheckpointType, batchSize: number) {
          const span = createSpan(`Pulling offline client`, undefined, context.active())
          span?.setAttribute('storeId', `${posSync0()?.id!}`)
          span?.setAttribute('deviceId', getDeviceId())
          span?.setAttribute('collection', collectionName)
          return runWithCurrentContext(span, async () => {
            const requestId = getRequestId()
            // @ts-ignore
            if (lastPulledCheckpoint && !lastPulledCheckpoint.lwt) {
              // @ts-ignore
              lastPulledCheckpoint.lwt = 0
            }
            // @ts-ignore
            if (lastPulledCheckpoint && !!lastPulledCheckpoint.lwtMaster && lastPulledCheckpoint.lwt > lastPulledCheckpoint.lwtMaster) {
              // @ts-ignore
              lastPulledCheckpoint.lwt = lastPulledCheckpoint.lwtMaster
            }
            const request = {
              _id: requestId,
              collection: options.collection.name,
              method: 'masterChangesSince',
              params: [lastPulledCheckpoint, batchSize],
            }
            span?.setAttribute('request', JSON.stringify(request))
            const timeout = setTimeout(() => {
              messages$.next({
                _id: requestId,
                result: {
                  timeout: true,
                },
              })
            }, 4000)
            wsClient!.send(JSON.stringify(request))
            const result = await firstValueFrom(
              messages$.pipe(
                filter(msg => msg._id === requestId || msg._id === 'error'),
                map(msg => msg.result)
              )
            )
            clearTimeout(timeout)
            if (result === 'error') {
              const err = new Error('Error')
              span?.recordException(err)
              span?.end()
              throw err
            }
            if (result?.timeout) {
              const err = new Error('Timeout')
              span?.recordException(err)
              span?.end()
              throw err
            }
            // @ts-ignore
            if (!!lastPulledCheckpoint?.sequence) {
              // @ts-ignore
              _.set(result, 'checkpoint.sequence', lastPulledCheckpoint.sequence)
            }
            span?.setAttribute('result', JSON.stringify(result))
            span?.end()
            if (result.checkpoint) {
              result.checkpoint.lwtMaster = result.checkpoint.lwt
            }
            return result
          })
        },
      },
      push: {
        batchSize: options.batchSize,
        async handler(docs: RxReplicationWriteToMasterRow<RxDocType>[]) {
          const span = createSpan(`Pushing offline client`, undefined, context.active())
          span?.setAttribute('storeId', `${posSync0()?.id!}`)
          span?.setAttribute('deviceId', getDeviceId())
          span?.setAttribute('collection', collectionName)
          const docsById: Record<any, any> = {}
          docs.forEach(doc => {
            docsById[(doc.newDocumentState as any)[primaryPath]] = doc
          })
          return runWithCurrentContext(span, async () => {
            let validateTimeout: any = null
            const requestId = getRequestId()
            const request = {
              _id: requestId,
              collection: options.collection.name,
              method: 'masterWrite',
              params: [docs],
            }
            span?.setAttribute('request', JSON.stringify(request))
            wsClient!.send(JSON.stringify(request))
            const timeout = setTimeout(() => {
              !!validateTimeout && clearTimeout(validateTimeout)
              messages$.next({
                _id: requestId,
                result: {
                  timeout: true,
                },
              })
            }, 4000)
            const result = await firstValueFrom(
              messages$.pipe(
                filter(msg => msg._id === requestId || msg._id === 'error'),
                map(msg => msg.result)
              )
            )
            clearTimeout(timeout)
            if (result === 'error') {
              !!validateTimeout && clearTimeout(validateTimeout)
              const err = new Error('Error')
              span?.recordException(err)
              span?.end()
              throw err
            }
            if (result?.timeout) {
              const err = new Error('Timeout')
              span?.recordException(err)
              span?.end()
              throw err
            }
            span?.setAttribute('result', JSON.stringify(result))
            span?.end()
            return result.filter((doc: any) => {
              const foundDoc = JSON.parse(JSON.stringify(docsById[doc[primaryPath]].newDocumentState))
              return !deepEqual(doc, foundDoc)
            })
          })
        },
      },
    })

    this.error$.subscribe(err => replicationState.subjects.error.next(err))

    this.connected$.subscribe(isConnected => {
      if (isConnected) {
        /**
         * When the client goes offline and online again,
         * we have to send a 'RESYNC' signal because the client
         * might have missed out events while being offline.
         */
        replicationState.reSync()
      }
      replicationState.connected = isConnected
    })

    const wakedUpTrigger = () => {
      replicationState.reSync()
    }
    wakeUpEE.on('wakedUp', wakedUpTrigger)
    replicationState.canceled$.subscribe(bool => {
      if (bool) {
        wakeUpEE.removeListener('wakedUp', wakedUpTrigger)
      }
    })
    return replicationState
  }
}
