import { context } from '@opentelemetry/api'
import { captureException } from '@sentry/react'
import AwaitLock from 'await-lock'
import dayjs from 'dayjs'
import _ from 'lodash'
import pTimeout from 'p-timeout'
import Queue from 'queue'
import {
  ensureNotFalsy,
  flatClone,
  randomCouchString,
  type ReplicationPullHandlerResult,
  type ReplicationPullOptions,
  type ReplicationPushOptions,
  type RxConflictHandler,
  type RxReplicationWriteToMasterRow,
  type WithDeleted,
} from 'rxdb'
import { filter, Subject } from 'rxjs'

import { deviceSetting0 } from '@/data/DeviceSettingSignal.ts'
import { posSync0 } from '@/data/PosSyncState.ts'
import { createSpan, runWithCurrentContext } from '@/lib/open-telemetry.ts'
import { rnLog } from '@/lib/rnlog.ts'
import { getSyncLock, releaseSyncLock } from '@/lib/surrealdb/sync-state-lock.ts'
import { now } from '@/pos/logic/time-provider.ts'
import { getDeviceId } from '@/shared/getDeviceId.ts'
import SurrealClient from '@/shared/SurrealClient.ts'
import { wakeUpEE } from '@/shared/wakeUpEE.ts'

import { RxdbSyncSurreal, RxSurrealDBReplicationState, type SurrealDBCheckPointType } from './rxdb-surreal-sync.ts'

const SYNC_TIMEOUT = 20000

const queue = new Queue({ autostart: true, concurrency: 4 })

export class RxdbSyncSurrealClient extends RxdbSyncSurreal {
  private syncLock: Record<string, AwaitLock> = {}

  async replicateSurrealDB<RxDocType>(options: any) {
    //todo: get surreal version and get upsert command
    const UPSERT_CMD = 'UPSERT'
    const surrealClient = this
    const canPushMaster = this.masterPush.includes(options.collectionName)
    this.failedCount[options.collectionName] = 0
    const collectionName = this.dbName + '_' + (options.collectionName.includes('-') ? options.collectionName.replaceAll('-', '_') : options.collectionName)
    if (!this.syncLock[collectionName]) this.syncLock[collectionName] = new AwaitLock()
    const collection = options.collection
    const conflictHandler: RxConflictHandler<any> = collection.conflictHandler
    const primaryPath = options.collection.schema.primaryPath

    let replicationPrimitivesPull: ReplicationPullOptions<RxDocType, SurrealDBCheckPointType> | undefined

    if (options.pull) {
      replicationPrimitivesPull = {
        async handler(lastPulledCheckpoint: SurrealDBCheckPointType | undefined, batchSize: number) {
          const randomSessionUUID = randomCouchString(8)
          const span = createSpan(`Pulling db client`, undefined, context.active())
          span?.setAttribute('storeId', `${posSync0()?.id!}`)
          span?.setAttribute('deviceId', getDeviceId())
          span?.setAttribute('collection', collectionName)
          span?.setAttribute('deviceName', deviceSetting0()?.name!)
          return runWithCurrentContext(span, async () => {
            await surrealClient.syncLock[collectionName].acquireAsync()

            return new Promise((resolve, reject) => {
              queue.push(async () => {
                const execPromise = new Promise(async (resolve, reject) => {
                  try {
                    surrealClient.rejectFn = reject
                    const db = await SurrealClient.getSurrealClient(surrealClient.dbName)
                    const since = lastPulledCheckpoint ? Math.round(lastPulledCheckpoint.lwt) : 0
                    rnLog(`[client:sync:pull] ${randomSessionUUID} ${collectionName} sync since ${since}`)
                    span?.setAttribute('since', since)
                    const _batchSize = Math.round(batchSize / (surrealClient.failedCount[options.collectionName] + 1))
                    const response = await db.query(
                      `SELECT * FROM ${collectionName}_logs WITH INDEX timestamp_idx WHERE created_at > ${since} AND outdated != true LIMIT ${_batchSize}`
                    )
                    span?.setAttribute('surrealPullResponse', JSON.stringify(response))
                    rnLog(`[client:sync:pull] ${randomSessionUUID} ${collectionName} surreal response ${JSON.stringify(response)}`)
                    let lastCheckPoint = since
                    let lastCheckPointMaster = lastPulledCheckpoint?.lwtMaster ? Math.round(lastPulledCheckpoint.lwtMaster) : 0
                    //@ts-ignore
                    const changes = response[0] as Array<any>
                    const documents: WithDeleted<RxDocType>[] = []
                    changes.forEach((change: any) => {
                      if (lastCheckPoint < change.created_at) {
                        lastCheckPoint = change.created_at
                      }
                      if (change.is_master_commit && lastCheckPointMaster < change.created_at) {
                        lastCheckPointMaster = change.created_at
                      }
                      if (change.event !== 'DELETE') {
                        const newDoc = !!change.doc?.__raw ? JSON.parse(atob(change.doc.__raw)) : change.doc
                        //@ts-ignore
                        documents.push(newDoc)
                      } else {
                        const newDoc = {
                          updatedAt: dayjs(now()).unix(),
                          _id: change.doc._id,
                          _deleted: true,
                        }
                        // @ts-ignore
                        documents.push(newDoc)
                      }
                    })
                    const returnDocs = _.uniqBy(documents.reverse(), '_id').reverse()
                    if (returnDocs.length < batchSize && returnDocs.length > 0) {
                      setTimeout(() => {
                        // resync in next tick
                        replicationState.reSync()
                      }, 0)
                    }
                    rnLog(`[client:sync:pull] ${randomSessionUUID} ${collectionName} returnedDocs ${JSON.stringify(returnDocs)}`)
                    resolve({
                      documents: returnDocs,
                      checkpoint: {
                        lwt: lastCheckPoint,
                        lwtMaster: lastCheckPointMaster,
                      },
                    })
                  } catch (e) {
                    setTimeout(() => {
                      reject(e)
                    }, 300)
                  }
                })
                try {
                  const res = (await pTimeout(execPromise, {
                    milliseconds: SYNC_TIMEOUT * (surrealClient.failedCount[options.collectionName] + 1),
                  })) as ReplicationPullHandlerResult<RxDocType, SurrealDBCheckPointType>
                  rnLog(`[client:sync:pull] ${randomSessionUUID} ${collectionName} done`)
                  span?.end()
                  if (surrealClient.failedCount[options.collectionName] > 0) surrealClient.failedCount[options.collectionName] -= 1
                  resolve(res)
                } catch (e: any) {
                  rnLog(`[client:sync:pull] ${randomSessionUUID} ${collectionName} error ${e.message}`)
                  if (e.message?.includes('timed out')) {
                    surrealClient.recordTimeout('pull')
                  }
                  span?.recordException(e)
                  span?.end()
                  if (surrealClient.failedCount[options.collectionName] < 7) surrealClient.failedCount[options.collectionName] += 1
                  captureException(e, { tags: { col: collectionName } })
                  console.log(`Client pull error ${collectionName}`, e)
                  reject(e)
                } finally {
                  surrealClient.rejectFn = null
                  surrealClient.syncLock[collectionName].release()
                }
              })
            })
          })
        },
        batchSize: ensureNotFalsy(options.pull).batchSize,
        modifier: ensureNotFalsy(options.pull).modifier,
      }
    }

    let replicationPrimitivesPush: ReplicationPushOptions<RxDocType> | undefined

    if (options.push) {
      replicationPrimitivesPush = {
        async handler(rows: RxReplicationWriteToMasterRow<RxDocType>[]) {
          const randomSessionUUID = randomCouchString(8)
          rnLog(`[client:sync:push] ${randomSessionUUID} ${collectionName} pushing ${JSON.stringify(rows)}`)
          const span = createSpan(`Pushing db client`, undefined, context.active())
          span?.setAttribute('storeId', `${posSync0()?.id!}`)
          span?.setAttribute('deviceId', getDeviceId())
          span?.setAttribute('collection', collectionName)
          span?.setAttribute('deviceName', deviceSetting0()?.name!)
          return runWithCurrentContext(span, async () => {
            await surrealClient.syncLock[collectionName].acquireAsync()
            getSyncLock(options.collectionName).tryAcquire()

            const execPromise = new Promise(async (resolve, reject) => {
              try {
                surrealClient.rejectFn = reject
                const db = await SurrealClient.getSurrealClient(surrealClient.dbName)
                const conflicts: WithDeleted<RxDocType>[] = []
                const queryParamMaster: string = rows.reduce((acc: string, cur) => {
                  if (acc === '') return acc + `${RxdbSyncSurreal.convertToSurrealId(`${collectionName}_master`, (cur.newDocumentState as any)[primaryPath])}`
                  return acc + ', ' + `${RxdbSyncSurreal.convertToSurrealId(`${collectionName}_master`, (cur.newDocumentState as any)[primaryPath])}`
                }, '')
                const queryParamClient: string = rows.reduce((acc: string, cur) => {
                  if (acc === '') return acc + `${RxdbSyncSurreal.convertToSurrealId(`${collectionName}_client`, (cur.newDocumentState as any)[primaryPath])}`
                  return acc + ', ' + `${RxdbSyncSurreal.convertToSurrealId(`${collectionName}_client`, (cur.newDocumentState as any)[primaryPath])}`
                }, '')
                let docsResponse
                docsResponse = await db.query(`
                  SELECT * FROM ${collectionName}_client WHERE id IN [${queryParamClient}];
                  SELECT * FROM ${collectionName}_master WHERE id IN [${queryParamMaster}];
                `)
                const docsClient = docsResponse?.[0] as Array<any>
                const clientDocMap = new Map<string, any>()
                const docsMaster = docsResponse?.[1] as Array<any>
                const masterDocMap = new Map<string, any>()
                docsMaster.forEach(doc => {
                  masterDocMap.set(
                    doc.doc[primaryPath],
                    !!doc.doc?.__raw
                      ? {
                          ...doc,
                          doc: JSON.parse(atob(doc.doc.__raw)),
                        }
                      : doc
                  )
                })
                docsClient.forEach(doc => {
                  clientDocMap.set(
                    doc.doc[primaryPath],
                    !!doc.doc?.__raw
                      ? {
                          ...doc,
                          doc: JSON.parse(atob(doc.doc.__raw)),
                        }
                      : doc
                  )
                })
                const nonConflictRows: typeof rows = []

                await Promise.all(
                  rows.map(async (row: any) => {
                    const forkStateDoc = row.newDocumentState
                    const onlineClientDoc = clientDocMap.get(forkStateDoc[primaryPath])
                    const onlineMasterDoc = masterDocMap.get(forkStateDoc[primaryPath])
                    const realMasterState = canPushMaster ? onlineMasterDoc?.doc : onlineClientDoc ? onlineClientDoc?.doc : onlineMasterDoc?.doc

                    const conflictHandlerResult = await conflictHandler(
                      {
                        realMasterState: realMasterState ? realMasterState : {},
                        newDocumentState: row.assumedMasterState ? row.assumedMasterState : {},
                      },
                      'couchdb-push-1'
                    )
                    rnLog(`[client:sync:push] ${randomSessionUUID} ${collectionName} compare ${JSON.stringify(realMasterState)} ${JSON.stringify(row)} status ${conflictHandlerResult.isEqual}`)
                    if (conflictHandlerResult.isEqual || !realMasterState /*|| import.meta.env.MODE === 'development'*/) {
                      nonConflictRows.push(row)
                    } else {
                      conflicts.push(realMasterState)
                    }
                  })
                )

                span?.setAttribute('conflicts', JSON.stringify(conflicts))
                let command = ''
                const randomUUID = randomCouchString(8)
                for (const row of nonConflictRows) {
                  const localDocId = (row.newDocumentState as any)[primaryPath]
                  const docId = RxdbSyncSurreal.convertToSurrealId(canPushMaster ? `${collectionName}_master` : `${collectionName}_client`, localDocId)

                  if (canPushMaster) {
                    command += `
                      ${UPSERT_CMD} ${docId} CONTENT {
                        "master_updated_at": time::millis(time::now()),
                        "doc": ${JSON.stringify(row.newDocumentState)},
                        "need_commit": true,
                        "is_master_commit": false,
                        "is_client_commit": true,
                        "is_force_commit": true,
                        "device_id": "${getDeviceId()}"
                      };
                    `
                  } else {
                    const clientDoc = clientDocMap.get(localDocId)
                    const clientDocExists = !!clientDoc
                    const sendDoc: Record<any, any> = {
                      doc: flatClone(row.newDocumentState),
                      assumed_masterState: flatClone(row.assumedMasterState),
                      docId: localDocId,
                      _deleted: row.newDocumentState._deleted,
                      pushId: randomUUID,
                      device_id: getDeviceId(),
                    }
                    sendDoc.pushId = randomUUID
                    if (clientDocExists) {
                      command += `
                        UPDATE ${docId} CONTENT ${JSON.stringify(sendDoc)} WHERE pushId = "${clientDoc.pushId}";
                      `
                    } else {
                      sendDoc.id = localDocId
                      command += `
                      INSERT INTO ${collectionName}_client ${JSON.stringify(sendDoc)};
                    `
                    }
                  }
                }
                if (command.length) {
                  try {
                    const queryRes = await db.query(`
                      BEGIN TRANSACTION;
                        ${command}
                        CREATE ${surrealClient.getEventNotificationsDb()} SET table = '${collectionName}';
                        COMMIT TRANSACTION;
                    `)
                    rnLog(`[client:sync:push] ${randomSessionUUID} ${collectionName} push response ${JSON.stringify(queryRes)}`)
                    for (let i = 0; i < queryRes.length - 1; i++) {
                      //@ts-ignore
                      if (!queryRes[i]?.length) {
                        conflicts.push(nonConflictRows[i].assumedMasterState as WithDeleted<RxDocType>)
                      }
                    }
                  } catch (err: any) {
                    rnLog(`[client:sync:push] ${randomSessionUUID} ${collectionName} push error ${err?.message}`)
                    if (err?.message?.includes('Parse error')) {
                      let command = ''
                      for (const row of nonConflictRows) {
                        const localDocId = (row.newDocumentState as any)[primaryPath]
                        const docId = RxdbSyncSurreal.convertToSurrealId(`${collectionName}_client`, localDocId)
                        const clientDoc = clientDocMap.get(localDocId)
                        const clientDocExists = !!clientDoc
                        const sendDoc: Record<any, any> = {
                          doc: {
                            _id: localDocId,
                            __raw: btoa(JSON.stringify(row.newDocumentState)),
                          },
                          assumed_masterState: {
                            __raw: btoa(JSON.stringify(row.assumedMasterState)),
                          },
                          docId: localDocId,
                          _deleted: row.newDocumentState._deleted,
                          pushId: randomUUID,
                          device_id: getDeviceId(),
                        }
                        if (clientDocExists) {
                          command += `
                            UPDATE ${docId} CONTENT ${JSON.stringify(sendDoc)} WHERE pushId = "${clientDoc.pushId}";
                          `
                        } else {
                          sendDoc.id = docId
                          command += `
                          INSERT INTO ${collectionName}_client ${JSON.stringify(sendDoc)};
                        `
                        }
                      }

                      const queryRes = await db.query(`
                        BEGIN TRANSACTION;
                          ${command}
                          CREATE ${surrealClient.getEventNotificationsDb()} SET table = '${collectionName}';
                          COMMIT TRANSACTION;
                      `)
                      span?.setAttribute('surrealPushResponse', JSON.stringify(queryRes))
                      for (let i = 0; i < queryRes.length - 1; i++) {
                        //@ts-ignore
                        if (!queryRes[i]?.length) {
                          conflicts.push(nonConflictRows[i].assumedMasterState as WithDeleted<RxDocType>)
                        }
                      }
                    } else {
                      throw err
                    }
                  }
                }
                resolve(conflicts)
              } catch (e) {
                setTimeout(() => {
                  reject(e)
                }, 300)
              }
            })

            try {
              const res = (await pTimeout(execPromise, {
                milliseconds: SYNC_TIMEOUT * (surrealClient.failedCount[options.collectionName] + 1),
              })) as WithDeleted<RxDocType>[]
              rnLog(`[client:sync:push] ${randomSessionUUID} ${collectionName} done`)
              span?.setAttribute('surrealPushResponse', JSON.stringify(res))
              span?.end()
              if (surrealClient.failedCount[options.collectionName] > 0) surrealClient.failedCount[options.collectionName] -= 1
              return res
            } catch (e: any) {
              rnLog(`[client:sync:push] ${randomSessionUUID} ${collectionName} error ${e?.message}`)
              if (e.message?.includes('timed out')) {
                surrealClient.recordTimeout('push')
              }
              span?.recordException(e)
              span?.end()
              if (surrealClient.failedCount[options.collectionName] < 7) surrealClient.failedCount[options.collectionName] += 1
              captureException(e, { tags: { col: collectionName } })
              console.log(`Client push error ${collectionName}`, e)
              throw e
            } finally {
              releaseSyncLock(options.collectionName)
              surrealClient.rejectFn = null
              surrealClient.syncLock[collectionName].release()
            }
          })
        },
        batchSize: options.push.batchSize,
        modifier: ensureNotFalsy(options.push).modifier,
      }
    }

    const replicationState = new RxSurrealDBReplicationState<RxDocType>(
      options.replicationIdentifier,
      collection,
      replicationPrimitivesPull,
      replicationPrimitivesPush,
      options.live,
      options.retryTime,
      options.autoStart
    )

    const wakedUpTrigger = () => {
      replicationState.reSync()
    }
    wakeUpEE.on('wakedUp', wakedUpTrigger)
    replicationState.canceled$.subscribe(bool => {
      if (bool) {
        wakeUpEE.removeListener('wakedUp', wakedUpTrigger)
      }
    })

    const startBefore = replicationState.start.bind(replicationState)
    replicationState.start = () => {
      ;(async () => {
        const observable = this.message$.pipe(filter((table: string) => table === collectionName))
        observable.subscribe({
          next: (value: any) => {
            replicationState.reSync()
          },
        })
      })()
      return startBefore()
    }

    return replicationState
  }
}

export const rxdbSurrealClient = new RxdbSyncSurrealClient()
