import { context } from '@opentelemetry/api'
import { captureException } from '@sentry/react'
import AwaitLock from 'await-lock'
import dayjs from 'dayjs'
import pTimeout from 'p-timeout'
import Queue from 'queue'
import {
  docStateToWriteDoc,
  ensureNotFalsy,
  randomCouchString,
  setCheckpoint,
  writeDocToDocState,
  type BulkWriteRow,
  type ById,
  type ReplicationPullHandlerResult,
  type ReplicationPullOptions,
  type ReplicationPushOptions,
  type RxCollection,
  type RxConflictHandler,
  type RxDocumentData,
  type RxReplicationHandler,
  type RxReplicationWriteToMasterRow,
  type WithDeleted,
} from 'rxdb'
import { filter } from 'rxjs'

import { deviceSetting0 } from '@/data/DeviceSettingSignal.ts'
import { completeReSync, posSync0, reSyncDb } from '@/data/PosSyncState.ts'
import { createSpan, runWithCurrentContext } from '@/lib/open-telemetry.ts'
import { rnLog } from '@/lib/rnlog.ts'
import { now } from '@/pos/logic/time-provider.ts'
import { getDeviceId } from '@/shared/getDeviceId.ts'
import SurrealClient from '@/shared/SurrealClient.ts'

import { RxdbSyncSurreal, RxSurrealDBReplicationState, type SurrealDBCheckPointType } from './rxdb-surreal-sync.ts'

const queue = new Queue({ autostart: true, concurrency: 4 })

const SYNC_TIMEOUT = 15000

const quickCompare = function (oldDoc: any, newDoc: any) {
  return oldDoc.updatedAt === newDoc.updatedAt
}

export class RxdbSyncSurrealMaster<RxDocType> extends RxdbSyncSurreal {
  private syncLock: Record<string, AwaitLock> = {}
  private resolvedIds: Record<string, number> = {}
  private replicationHandlerByCollection: Map<string, RxReplicationHandler<any, any>> = new Map()

  async fillPreviousForForceCommit(forceCommit: any[], primaryPath: string, collection: RxCollection, databaseInstanceToken: string) {
    const rowById: ById<RxReplicationWriteToMasterRow<RxDocType>> = {}
    forceCommit.forEach(row => {
      const docId: string = (row.document as any)[primaryPath]
      rowById[docId] = row
    })
    const ids = Object.keys(rowById)

    const masterDocsStateList = await collection.storageInstance.findDocumentsById(ids, true)
    const masterDocsState = new Map<string, RxDocumentData<RxDocType>>()
    masterDocsStateList.forEach(doc => masterDocsState.set((doc as any)[primaryPath], doc))

    forceCommit.forEach(row => {
      const docId: string = (row.document as any)[primaryPath]
      const previous = masterDocsState.get(docId)
      row.previous = previous
      row.document = docStateToWriteDoc(databaseInstanceToken, false, false, row.document)
    })
  }

  async getMasterWriteRows(rows: any[], masterDocMap: Map<string, any>, primaryPath: string, collection: RxCollection, databaseInstanceToken: string, collectionName: string) {
    const hasAttachments = false
    const keepMeta = false
    const rowById: ById<RxReplicationWriteToMasterRow<RxDocType>> = {}
    rows.forEach(row => {
      const docId: string = (row.newDocumentState as any)[primaryPath]
      rowById[docId] = row
    })
    const ids = Object.keys(rowById)

    const masterDocsStateList = await collection.storageInstance.findDocumentsById(ids, true)
    const masterDocsState = new Map<string, RxDocumentData<RxDocType>>()
    masterDocsStateList.forEach(doc => masterDocsState.set((doc as any)[primaryPath], doc))
    const conflicts: WithDeleted<RxDocType>[] = []
    const writeRows: BulkWriteRow<RxDocType>[] = []
    await Promise.all(
      Object.entries(rowById).map(async ([id, row]) => {
        const masterState = masterDocsState.get(id)
        const onlineMasterState = masterDocMap.get(id)?.doc
        if (!masterState) {
          writeRows.push({
            document: docStateToWriteDoc(databaseInstanceToken, hasAttachments, keepMeta, row.newDocumentState),
          })
        } else if (masterState && !row.assumedMasterState) {
          conflicts.push(writeDocToDocState(masterState, hasAttachments, keepMeta))
        } else {
          if (this.hasConflictHandler.includes(collectionName)) {
            // handle conflict here
            let compareResult = {
              isEqual: quickCompare(writeDocToDocState(masterState, hasAttachments, keepMeta), ensureNotFalsy(row.assumedMasterState)),
            }
            if (!compareResult.isEqual) {
              compareResult = await collection.conflictHandler(
                {
                  realMasterState: writeDocToDocState(masterState, hasAttachments, keepMeta),
                  newDocumentState: ensureNotFalsy(row.newDocumentState),
                },
                'masterWrite-online'
              )
            }
            if (compareResult.isEqual) {
              writeRows.push({
                previous: masterState,
                document: docStateToWriteDoc(databaseInstanceToken, hasAttachments, keepMeta, row.newDocumentState, masterState),
              })
            } else {
              writeRows.push({
                previous: masterState,
                document: docStateToWriteDoc(databaseInstanceToken, hasAttachments, keepMeta, compareResult.documentData, masterState),
              })
            }
          } else {
            if (
              (
                await collection.conflictHandler(
                  {
                    realMasterState: writeDocToDocState(masterState, hasAttachments, keepMeta),
                    newDocumentState: ensureNotFalsy(row.assumedMasterState),
                  },
                  'masterWrite-online'
                )
              ).isEqual ||
              (
                await collection.conflictHandler(
                  {
                    realMasterState: writeDocToDocState(masterState, hasAttachments, keepMeta),
                    newDocumentState: !!onlineMasterState ? onlineMasterState : {},
                  },
                  'masterWrite-online'
                )
              ).isEqual
            ) {
              writeRows.push({
                previous: masterState,
                document: docStateToWriteDoc(databaseInstanceToken, hasAttachments, keepMeta, row.newDocumentState, masterState),
              })
            } else {
              conflicts.push(writeDocToDocState(masterState, hasAttachments, keepMeta))
            }
          }
        }
      })
    )

    return {
      writeRows,
      conflicts,
    }
  }

  async replicateSurrealDB<RxDocType>(options: any): Promise<RxSurrealDBReplicationState<RxDocType>> {
    //todo: get surreal version and get upsert command
    const UPSERT_CMD = 'UPSERT'
    const surrealMaster = this
    this.failedCount[options.collectionName] = 0
    const collectionName = this.dbName + '_' + (options.collectionName.includes('-') ? options.collectionName.replaceAll('-', '_') : options.collectionName)
    if (!this.syncLock[collectionName]) this.syncLock[collectionName] = new AwaitLock()
    const collection = options.collection
    const conflictHandler: RxConflictHandler<any> = collection.conflictHandler
    const primaryPath = options.collection.schema.primaryPath
    const databaseInstanceToken = options.collection.database.token

    const getMasterDoc = async (docIds: string[]) => {
      const db = await SurrealClient.getSurrealClient(this.dbName)
      const needPullClient = this.hasConflictHandler.includes(options.collectionName)
      const queryParamMaster: string = docIds.reduce((acc: string, cur) => {
        if (acc === '') return acc + `${RxdbSyncSurreal.convertToSurrealId(`${collectionName}_master`, cur)}`
        return acc + ', ' + `${RxdbSyncSurreal.convertToSurrealId(`${collectionName}_master`, cur)}`
      }, '')
      let queryParamClient = ''
      if (needPullClient) {
        queryParamClient = docIds.reduce((acc: string, cur) => {
          if (acc === '') return acc + `${RxdbSyncSurreal.convertToSurrealId(`${collectionName}_client`, cur)}`
          return acc + ', ' + `${RxdbSyncSurreal.convertToSurrealId(`${collectionName}_client`, cur)}`
        }, '')
      }
      let query = `SELECT * FROM ${collectionName}_master WHERE id IN [${queryParamMaster}];`
      if (needPullClient) {
        query += `SELECT * FROM ${collectionName}_client WHERE id IN [${queryParamClient}];`
      }

      const masterDocsResponse = await db.query(query)
      const docsMaster = masterDocsResponse?.[0] as Array<any>
      const docsClient = masterDocsResponse?.[1] as Array<any>
      const masterDocMap = new Map<string, any>()
      const clientDocMap = new Map<string, any>()
      docsMaster.forEach(doc => {
        masterDocMap.set(
          doc.doc[primaryPath],
          !!doc.doc?.__raw
            ? {
                ...doc,
                doc: JSON.parse(atob(doc.doc.__raw)),
              }
            : doc
        )
      })
      if (docsClient) {
        docsClient.forEach(doc => {
          clientDocMap.set(
            doc.doc[primaryPath],
            !!doc.doc?.__raw
              ? {
                  ...doc,
                  doc: JSON.parse(atob(doc.doc.__raw)),
                }
              : doc
          )
        })
      }

      return { masterDocMap, clientDocMap }
    }

    const replicationPrimitivesPull: ReplicationPullOptions<RxDocType, SurrealDBCheckPointType> = {
      async handler(lastPulledCheckpoint: SurrealDBCheckPointType | undefined, batchSize: number) {
        const randomUUID = randomCouchString(8)
        const span = createSpan(`Pulling db master`, undefined, context.active())
        span?.setAttribute('storeId', `${posSync0()?.id!}`)
        span?.setAttribute('deviceId', getDeviceId())
        span?.setAttribute('collection', collectionName)
        span?.setAttribute('deviceName', deviceSetting0()?.name!)
        return runWithCurrentContext(span, async () => {
          await surrealMaster.syncLock[collectionName].acquireAsync()
          const db = await SurrealClient.getSurrealClient(surrealMaster.dbName)

          return new Promise((resolve, reject) => {
            queue.push(async () => {
              const execPromise = new Promise(async (resolve, reject) => {
                try {
                  surrealMaster.rejectFn = reject
                  const since = lastPulledCheckpoint ? Math.round(lastPulledCheckpoint.lwt) : 0
                  rnLog(`[master:sync:pull] ${randomUUID} ${collectionName} sync since ${since}`)
                  span?.setAttribute('since', since)
                  const response = await db.query(`SELECT * FROM ${collectionName}_logs WHERE created_at > ${since} AND outdated != true ORDER BY created_at LIMIT ${batchSize}`)
                  span?.setAttribute('surrealPullResponse', JSON.stringify(response))
                  rnLog(`[master:sync:pull] ${randomUUID} ${collectionName} surreal response ${JSON.stringify(response)}`)
                  let lastCheckPoint = since
                  //@ts-ignore
                  const changes = response[0] as Array<any>
                  const forceCommit: any[] = []
                  const deleteCommit: any[] = []

                  const docIds: string[] = []
                  const documents: any[] = []
                  const changeIdMap: Map<string, any> = new Map<string, string>()
                  changes.forEach((change: any) => {
                    if (lastCheckPoint < change.created_at) {
                      lastCheckPoint = change.created_at
                    }
                    if (change.is_master_commit && !reSyncDb.includes(options.collectionName)) return
                    docIds.push(change.docId)
                    changeIdMap.set(change.docId, change.id)
                    if (change.is_force_commit) {
                      forceCommit.push({
                        document: !!change.doc?.__raw ? JSON.parse(atob(change.doc.__raw)) : change.doc,
                      })
                    } else {
                      if (change.event !== 'DELETE') {
                        documents.push({
                          newDocumentState: !!change.doc?.__raw ? JSON.parse(atob(change.doc.__raw)) : change.doc,
                          assumedMasterState: !!change.assumed_masterState?.__raw ? JSON.parse(atob(change.assumed_masterState.__raw)) : change.assumed_masterState,
                        })
                      } else {
                        const newDoc = {
                          updatedAt: dayjs(now()).valueOf(),
                          _id: change.doc._id,
                          _deleted: true,
                        }
                        deleteCommit.push({
                          document: newDoc,
                        })
                      }
                    }
                  })

                  const { masterDocMap } = await getMasterDoc(docIds)

                  const { writeRows, conflicts } = await surrealMaster.getMasterWriteRows(documents, masterDocMap, primaryPath, collection, databaseInstanceToken, options.collectionName)

                  rnLog(`[master:sync:pull] ${randomUUID} ${collectionName} writeRows ${JSON.stringify(writeRows)}`)
                  rnLog(`[master:sync:pull] ${randomUUID} ${collectionName} conflicts ${JSON.stringify(conflicts)}`)

                  let command = ''
                  for (const row of writeRows) {
                    const localDocId = (row.document as any)[primaryPath]
                    const docId = RxdbSyncSurreal.convertToSurrealId(`${collectionName}_master`, localDocId)
                    command += `
                      ${UPSERT_CMD} ${docId} CONTENT {
                        "master_updated_at": time::millis(time::now()),
                        "doc": ${JSON.stringify(writeDocToDocState(row.document, false, false))},
                        "need_commit": true,
                        "is_master_commit": true,
                        "device_id": "${getDeviceId()}"
                      };
                    `
                  }
                  for (const row of conflicts) {
                    const localDocId = (row as any)[primaryPath]
                    const docId = RxdbSyncSurreal.convertToSurrealId(`${collectionName}_master`, localDocId)
                    command += `
                      ${UPSERT_CMD} ${docId} CONTENT {
                        "master_updated_at": time::millis(time::now()),
                        "doc": ${JSON.stringify(row)},
                        "need_commit": true,
                        "is_master_commit": true,
                        "device_id": "${getDeviceId()}"
                      };
                    `
                  }
                  if (command.length) command += `CREATE ${surrealMaster.getEventNotificationsDb()} SET table = "${collectionName}";`
                  if (command.length) {
                    try {
                      await db.query(`
                        BEGIN TRANSACTION;
                        ${command}
                        COMMIT TRANSACTION;
                      `)
                    } catch (err: any) {
                      rnLog(`[master:sync:pull] ${randomUUID} ${collectionName} pull res error ${err?.message}`)
                      let command = ''
                      if (err?.message?.includes('Parse error')) {
                        for (const row of conflicts) {
                          const localDocId = (row as any)[primaryPath]
                          const docId = RxdbSyncSurreal.convertToSurrealId(`${collectionName}_master`, localDocId)
                          command += `
                            ${UPSERT_CMD} ${docId} CONTENT {
                              "master_updated_at": time::millis(time::now()),
                              "doc": {
                                "_id": "${localDocId}",
                                "__raw": "${btoa(JSON.stringify(row))}"
                              },
                              "need_commit": true,
                              "is_master_commit": true,
                              "device_id": "${getDeviceId()}"
                            };
                          `
                        }
                        await db.query(`
                          BEGIN TRANSACTION;
                          ${command}
                          COMMIT TRANSACTION;
                        `)
                      } else {
                        return reject(err)
                      }
                    }
                  }

                  await surrealMaster.fillPreviousForForceCommit(forceCommit, primaryPath, collection, databaseInstanceToken)
                  await surrealMaster.fillPreviousForForceCommit(deleteCommit, primaryPath, collection, databaseInstanceToken)
                  writeRows.push(...deleteCommit)
                  if (writeRows.length > 0) {
                    const result = await collection.storageInstance.bulkWrite(writeRows, 'replication-master-write')
                    rnLog(`[master:sync:pull] ${randomUUID} ${collectionName} write res ${result}`)
                    result.error.forEach((err: any) => {
                      if (err.status !== 409) {
                        throw new Error('non conflict error')
                      } else {
                      }
                    })
                  }
                  if (forceCommit.length > 0) {
                    const result = await collection.storageInstance.bulkWrite(forceCommit, await replicationState.internalReplicationState!.downstreamBulkWriteFlag)
                    rnLog(`[master:sync:pull] ${randomUUID} ${collectionName} force write res ${result}`)
                    result.error.forEach((err: any) => {
                      if (err.status !== 409) {
                        throw new Error('non conflict error')
                      } else {
                      }
                    })
                  }

                  setCheckpoint(replicationState.internalReplicationState!, 'down', {
                    lwt: lastCheckPoint,
                  })
                  if (writeRows.length > 0) {
                    setTimeout(() => {
                      // resync in next tick
                      replicationState.reSync()
                    }, 0)
                  } else {
                    if (reSyncDb.includes(options.collectionName)) {
                      completeReSync(options.collectionName)
                    }
                  }
                  resolve({
                    documents: [],
                    checkpoint: {
                      lwt: lastCheckPoint,
                      lwtMaster: lastCheckPoint,
                    },
                  })
                } catch (e) {
                  setTimeout(() => {
                    reject(e)
                  }, 300)
                }
              })
              try {
                const res = (await pTimeout(execPromise, {
                  milliseconds: SYNC_TIMEOUT * (surrealMaster.failedCount[options.collectionName] + 1),
                })) as ReplicationPullHandlerResult<RxDocType, SurrealDBCheckPointType>
                span?.end()
                rnLog(`[master:sync:pull] ${randomUUID} ${collectionName} done`)
                if (surrealMaster.failedCount[options.collectionName] > 0) surrealMaster.failedCount[options.collectionName] -= 1
                resolve(res)
              } catch (e: any) {
                rnLog(`[master:sync:pull] ${randomUUID} ${collectionName} error ${e.message}`)
                if (e.message?.includes('timed out')) {
                  surrealMaster.recordTimeout('pull')
                }
                span?.recordException(e)
                span?.end()
                if (surrealMaster.failedCount[options.collectionName] < 7) surrealMaster.failedCount[options.collectionName] += 1
                captureException(e, { tags: { col: collectionName, master: true } })
                console.log(`Master pull error ${collectionName}`, e)
                reject(e)
              } finally {
                surrealMaster.rejectFn = null
                surrealMaster.syncLock[collectionName].release()
              }
            })
          })
        })
      },
      modifier: options.pull.modifier,
    }

    const replicationPrimitivesPush: ReplicationPushOptions<RxDocType> = {
      async handler(rows: RxReplicationWriteToMasterRow<RxDocType>[]) {
        const randomUUID = randomCouchString(8)
        rnLog(`[master:sync:push] ${randomUUID} ${collectionName} rows ${JSON.stringify(rows)}`)
        const span = createSpan(`Pushing db master`, undefined, context.active())
        span?.setAttribute('storeId', `${posSync0()?.id!}`)
        span?.setAttribute('deviceId', getDeviceId())
        span?.setAttribute('collection', collectionName)
        span?.setAttribute('deviceName', deviceSetting0()?.name!)
        return runWithCurrentContext(span, async () => {
          await surrealMaster.syncLock[collectionName].acquireAsync()
          const db = await SurrealClient.getSurrealClient(surrealMaster.dbName)
          const conflicts: WithDeleted<RxDocType>[] = []

          const execPromise = new Promise(async (resolve, reject) => {
            try {
              surrealMaster.rejectFn = reject
              const docIds: string[] = []
              rows.forEach(row => {
                const localDocId = (row.newDocumentState as any)[primaryPath]
                docIds.push(localDocId)
              })
              const { masterDocMap, clientDocMap } = await getMasterDoc(docIds)

              let command = ''
              const needDeleteIds = []
              for (const row of rows) {
                const localDocId = (row.newDocumentState as any)[primaryPath]
                const masterDoc = masterDocMap.get(localDocId)
                const clientDoc = clientDocMap.get(localDocId)
                if (!!clientDoc && clientDoc.doc?.updatedAt !== surrealMaster.resolvedIds[localDocId]) {
                  surrealMaster.resolvedIds[localDocId] = clientDoc.doc?.updatedAt
                  conflicts.push(clientDoc.doc)
                } else {
                  let needCommit = true
                  if (masterDoc) {
                    const conflictHandlerResult = await conflictHandler(
                      {
                        realMasterState: masterDoc.doc,
                        newDocumentState: row.newDocumentState,
                      },
                      'couchdb-push-1'
                    )
                    if (conflictHandlerResult.isEqual) {
                      needCommit = false
                    }
                  }

                  rnLog(`[master:sync:push] ${JSON.stringify(row)} ${JSON.stringify(masterDoc)} check equal: ${needCommit}`)
                  if (needCommit) {
                    const docId = RxdbSyncSurreal.convertToSurrealId(`${collectionName}_master`, localDocId)
                    command += `
                      ${UPSERT_CMD} ${docId} CONTENT {
                        "master_updated_at": time::millis(time::now()),
                        "doc": ${JSON.stringify(row.newDocumentState)},
                        "need_commit": true,
                        "is_master_commit": true,
                        "device_id": "${getDeviceId()}"
                      };
                    `
                    needDeleteIds.push(localDocId)
                  }
                }
              }
              if (command.length) {
                try {
                  const surrealRes = await db.query(`
                    BEGIN TRANSACTION;
                    ${command}
                    CREATE ${surrealMaster.getEventNotificationsDb()} SET table = '${collectionName}';
                    COMMIT TRANSACTION;
                  `)
                  rnLog(`[master:sync:push] ${randomUUID} ${collectionName} surreal res ${JSON.stringify(surrealRes)}`)
                  needDeleteIds.forEach(id => delete surrealMaster.resolvedIds[id])
                  span?.setAttribute('surrealPushResponse', JSON.stringify(surrealRes))
                } catch (err: any) {
                  rnLog(`[master:sync:push] ${randomUUID} ${collectionName} surreal error ${err?.message}`)
                  span?.recordException(err)
                  let command = ''
                  if (err?.message?.includes('Parse error')) {
                    for (const row of rows) {
                      const localDocId = (row.newDocumentState as any)[primaryPath]
                      const masterDoc = masterDocMap.get(localDocId)
                      let needCommit = true
                      if (masterDoc) {
                        const conflictHandlerResult = await conflictHandler(
                          {
                            realMasterState: masterDoc.doc,
                            newDocumentState: row.newDocumentState,
                          },
                          'couchdb-push-1'
                        )
                        if (conflictHandlerResult.isEqual) {
                          needCommit = false
                        }
                      }

                      if (needCommit) {
                        const docId = RxdbSyncSurreal.convertToSurrealId(`${collectionName}_master`, localDocId)
                        command += `
                          ${UPSERT_CMD} ${docId} CONTENT {
                            "master_updated_at": time::millis(time::now()),
                            "doc": {
                              _id: "${localDocId}",
                              __raw: "${btoa(JSON.stringify(row.newDocumentState))}"
                            },
                            "need_commit": true,
                            "is_master_commit": true,
                            "device_id": "${getDeviceId()}"
                          };
                        `
                      }
                      await db.query(`
                        BEGIN TRANSACTION;
                        ${command}
                        COMMIT TRANSACTION;
                      `)
                    }
                  } else {
                    return reject(err)
                  }
                }
              }
              span?.setAttribute('conflicts', JSON.stringify(conflicts))
              resolve(conflicts)
            } catch (e) {
              setTimeout(() => {
                reject(e)
              }, 300)
            }
          })

          try {
            const res = await pTimeout(execPromise, {
              milliseconds: SYNC_TIMEOUT * (surrealMaster.failedCount[options.collectionName] + 1),
            })
            rnLog(`[master:sync:push] ${randomUUID} ${collectionName} done`)
            span?.end()
            if (surrealMaster.failedCount[options.collectionName] > 0) surrealMaster.failedCount[options.collectionName] -= 1
            return res
          } catch (e: any) {
            rnLog(`[master:sync:push] ${randomUUID} ${collectionName} error ${e.message}`)
            if (e.message?.includes('timed out')) {
              surrealMaster.recordTimeout('push')
            }
            span?.recordException(e)
            span?.end()
            if (surrealMaster.failedCount[options.collectionName] < 7) surrealMaster.failedCount[options.collectionName] += 1
            console.log(`Master push error ${collectionName}`, e.message)
            captureException(e, { tags: { col: collectionName, master: true } })
            //@ts-ignore
            console.log('Error', e.message)
            throw e
          } finally {
            surrealMaster.rejectFn = null
            surrealMaster.syncLock[collectionName].release()
          }
        })
      },
      batchSize: options.push.batchSize,
      modifier: options.push.modifier,
    }

    const replicationState = new RxSurrealDBReplicationState<RxDocType>(
      options.replicationIdentifier,
      collection,
      replicationPrimitivesPull,
      replicationPrimitivesPush,
      options.live,
      options.retryTime,
      options.autoStart
    )

    const startBefore = replicationState.start.bind(replicationState)
    replicationState.start = () => {
      ;(async () => {
        const observable = this.message$.pipe(filter((table: string) => table === collectionName || table === 'resync'))
        observable.subscribe({
          next: (value: any) => {
            replicationState.reSync()
          },
        })
      })()
      return startBefore()
    }

    return replicationState
  }
}

export const rxdbSurrealMaster = new RxdbSyncSurrealMaster()
