import isObject from "lodash/isObject";
import firestore from "src/services/firestore";
import { Client, Thread } from "@langchain/langgraph-sdk";
import { LangChainMessage } from "@assistant-ui/react-langgraph";
import { StreamEvent } from "@langchain/langgraph-sdk/dist/types";

import { showDevelopmentError } from "src/utils";
import { getTimestamps } from "../utils";
import { COLLECTION_IDS } from "../constants";

// Inner imports
import * as schemas from "./threadsSchema";

const {
  REACT_APP_API_BASE_URL = "",
  REACT_APP_LANGGRAPH_MODEL_NAME = "",
  REACT_APP_LANGGRAPH_PERSONAL_TOKEN = "",
} = process.env;

const createClient = (agentUrl: string): Client =>
  new Client({
    apiUrl: agentUrl,
    apiKey: REACT_APP_LANGGRAPH_PERSONAL_TOKEN,
  });

export const getThreadData = async (
  threadId: Thread.Data["id"],
  agentUrl: string,
): Promise<
  Thread<Record<"messages", unknown>[] | Record<"messages", unknown>>
> => {
  const client = createClient(agentUrl);

  return client.threads.get(threadId);
};

export const getThreadsByUserId = async ({
  userId,
  agentUrl,
}: {
  userId: User.Data["id"];
  agentUrl: string;
}): Promise<Thread.Data[]> => {
  const collection = await firestore()
    .collection(COLLECTION_IDS.threads)
    .where("authorId", "==", userId)
    .where("deletedAt", "==", null)
    .get();

  const threads: Thread.Data[] = [];

  const threadMetadataPromises: Promise<Thread>[] = [];

  const threadsMetadata: Record<
    Thread.Data["id"],
    Thread.Data["configuration"]
  > = {};

  for (const doc of collection.docs) {
    try {
      const validatedThread = schemas.threadSchema.validateSync(doc.data());

      const thread = {
        ...validatedThread,
        id: doc.id,
        messages: [],
        configuration: {},
      };

      threads.push(thread);
    } catch (error) {
      const errorTitle = `THREAD VALIDATION ERROR (${doc.id})`;

      showDevelopmentError({ error, additionalTexts: [errorTitle] });
    }
  }

  for (const { id } of threads)
    threadMetadataPromises.push(getThreadData(id, agentUrl));

  const threadsMetadataResult = await Promise.allSettled(
    threadMetadataPromises,
  );

  for (const promiseResult of threadsMetadataResult) {
    if (promiseResult.status === "rejected") continue;

    const { value: threadMetadata } = promiseResult;

    if (
      "config" in threadMetadata &&
      isObject(threadMetadata.config) &&
      "configurable" in threadMetadata.config &&
      isObject(threadMetadata.config.configurable)
    )
      threadsMetadata[threadMetadata.thread_id] = {
        ...threadMetadata.config.configurable,
      };
  }

  for (const { id } of threads) {
    const threadMetadata = threadsMetadata[id];

    const thread = threads.find((thread) => thread.id === id);

    if (!thread) continue;

    thread.configuration = { ...threadMetadata };
  }

  return threads;
};

export const createThread = async ({
  messages,
  agentUrl,
  ...rest
}: Store.CreateEntity<Thread.Data> & {
  agentUrl: string;
}): Promise<Thread.Data> => {
  const { createdAt, updatedAt } = getTimestamps();

  const _payload = { ...rest, createdAt, updatedAt };

  const clientThread = await createClient(agentUrl).threads.create();

  await firestore()
    .collection(COLLECTION_IDS.threads)
    .doc(clientThread.thread_id)
    .set(_payload);

  const thread = { ..._payload, id: clientThread.thread_id, messages };

  if (
    "config" in clientThread &&
    isObject(clientThread.config) &&
    "configurable" in clientThread.config &&
    isObject(clientThread.config.configurable)
  )
    thread.configuration = { ...clientThread.config.configurable };

  return thread;
};

export const updateThread = async ({
  id,
  changes,
}: Store.UpdateEntity<Thread.Data>): Promise<
  Store.UpdateEntity<Thread.Data>
> => {
  const { updatedAt } = getTimestamps();

  const _changes = {
    ...changes,
    updatedAt,
  };

  await firestore()
    .collection(COLLECTION_IDS.threads)
    .doc(id)
    .set(_changes, { merge: true });

  return { id, changes: _changes };
};

export const updateThreads = async (
  payload: Store.UpdateEntity<Thread.Data>[],
): Promise<Store.UpdateEntity<Thread.Data>[]> => {
  const { updatedAt } = getTimestamps();

  const updatedPayload: Store.UpdateEntity<Thread.Data>[] = [];

  const batch = firestore().batch();

  for (const { id, changes } of payload) {
    const _changes = {
      ...changes,
      updatedAt,
    };

    const docRef = firestore().collection(COLLECTION_IDS.threads).doc(id);

    batch.set(docRef, _changes, { merge: true });

    updatedPayload.push({ id, changes: _changes });
  }

  await batch.commit();

  return updatedPayload;
};

export const updateThreadsByAuthorId = async (
  {
    changes,
    authorId,
  }: {
    changes: Store.UpdateEntity<Thread.Data>["changes"];
    authorId: Thread.Data["authorId"];
  },
  companyId: Company.Data["id"],
): Promise<Store.UpdateEntity<Thread.Data>[]> => {
  const { updatedAt } = getTimestamps();

  const updatedPayload: Store.UpdateEntity<Thread.Data>[] = [];

  const batch = firestore().batch();

  const threadsByAuthorId = await firestore()
    .collection(COLLECTION_IDS.threads)
    .where("companyId", "==", companyId)
    .where("authorId", "==", authorId)
    .get();

  for (const { id, ref } of threadsByAuthorId.docs) {
    const _changes = { ...changes, updatedAt };

    batch.set(ref, _changes, { merge: true });

    updatedPayload.push({ id, changes: _changes });
  }

  await batch.commit();

  return updatedPayload;
};

export const deleteThread = async ({
  id,
  agentUrl,
}: {
  id: Thread.Data["id"];
  agentUrl: string;
}): Promise<Thread.Data["id"]> => {
  const { updatedAt: deletedAt } = getTimestamps();

  await Promise.all([
    firestore()
      .collection(COLLECTION_IDS.threads)
      .doc(id)
      .set({ deletedAt }, { merge: true }),
    createClient(agentUrl).threads.update(id, { metadata: { deletedAt } }),
  ]);

  return id;
};

export const deleteThreads = async ({
  ids,
  agentUrl,
}: {
  ids: Thread.Data["id"][];
  agentUrl: string;
}): Promise<Thread.Data["id"][]> => {
  const { updatedAt: deletedAt } = getTimestamps();

  const batch = firestore().batch();

  const client = createClient(agentUrl);

  const promises: Promise<unknown>[] = [];

  for (const threadId of ids) {
    const docRef = firestore().collection(COLLECTION_IDS.threads).doc(threadId);

    batch.set(docRef, { deletedAt }, { merge: true });

    promises.push(client.threads.update(threadId, { metadata: { deletedAt } }));
  }

  await Promise.all([batch.commit(), ...promises]);

  return ids;
};

export const streamMessage = async ({
  agentId,
  agentUrl,
  threadId,
  messages,
  callback,
  configuration,
}: {
  agentId: string;
  agentUrl: string;
  callback: (
    threadId: Thread.Data["id"],
    value: { event: StreamEvent; data: any },
  ) => void;
  threadId: Thread.Data["id"];
  messages: LangChainMessage[];
  configuration?: Record<string, unknown>;
}) => {
  const client = createClient(agentUrl);

  const input: Nullable<Record<string, unknown>> = { messages };

  const formattedConfiguration = {
    configurable: {
      ...configuration,
      apiURL: REACT_APP_API_BASE_URL,
      model_name: REACT_APP_LANGGRAPH_MODEL_NAME,
    },
  };

  const stream = client.runs.stream(threadId, agentId, {
    input,
    config: formattedConfiguration,
    streamMode: "messages",
  });

  for await (const chunk of stream) callback(threadId, chunk);
};
