import { StreamDefinition } from '../realm/stream-definition.js';
import { StoredEvent } from '../event-store/stored-event.js';
import { RuntimeResult } from './runtime-result.js';
import { EventAuth } from '../event-store/event-auth.js';
import {
  EventContextSnapshotClient,
  EventContextSnapshotIndexClient,
  EventContextStorage,
  EventHandleContext,
  EventHandleStateClient,
} from './runtime.js';
import { StreamHostRuntime } from './host-runtime.js';
import { EventRandomSeed } from '../event-store/event-random-seed.js';
import { getError, getErrorStackTrace } from './get-error.js';
import { hostResultBuilder } from './host-result-builder.js';
import { SnapshotClient } from '../storage/snapshot-client.js';
import { DeleteType, PatchType, UpsertType } from '../storage/state-client.js';
import { RuntimeEnv } from '../runtime-env.js';
import {
  StateDefinition,
  StateDefinitions,
  StateIndex,
  StateKeyValues,
  TypeOfStateDeclaration,
} from '../storage/state-declaration.js';
import {
  FilterKey,
  FilterStateIndex,
  FilterValue,
  RangeOptions,
  SnapshotIndexClient,
} from '../storage/snapshot-index-client.js';
import { RangeResult } from '../storage/browse-result.js';

export function createEventContextStorage<
  TState extends StateDefinition<any, any, any>,
>(
  state: SnapshotClient<TState>,
  signal: AbortSignal,
): EventContextSnapshotClient<TState> {
  return {
    get(
      id: StateKeyValues<TState>,
    ): Promise<RangeResult<TypeOfStateDeclaration<TState>> | null> {
      return state.get(id, { signal });
    },
    range(
      bookmark: StateKeyValues<TState> | null,
      opts?: Omit<RangeOptions, 'signal'>,
    ): AsyncGenerator<RangeResult<TypeOfStateDeclaration<TState>>> {
      return state.range(bookmark, {
        open: opts?.open,
        dir: opts?.dir,
        signal,
      });
    },
    index<K extends keyof TState['indices'] & string>(
      index: K,
    ): EventContextSnapshotIndexClient<TState, TState['indices'][K]> {
      const indexState = state.index(index);
      return createEventContextIndexClient(indexState, signal);
    },
  };
}

export function createEventContextIndexClient<
  TState extends StateDefinition<any, any, any>,
  TIndex extends StateIndex<any, any>,
>(
  state: SnapshotIndexClient<TState, TIndex>,
  signal: AbortSignal,
): EventContextSnapshotIndexClient<TState, TIndex> {
  return {
    range(
      bookmark: FilterValue<TState, TIndex> | null,
      opts?: Omit<RangeOptions, 'signal'>,
    ): AsyncGenerator<RangeResult<TypeOfStateDeclaration<TState>>> {
      return state.range(bookmark, {
        signal,
        open: opts?.open,
        dir: opts?.dir,
      });
    },
    filter(
      key: FilterKey<TIndex>,
      value: FilterValue<TState, TIndex>,
    ): EventContextSnapshotIndexClient<TState, FilterStateIndex<TIndex>> {
      return createEventContextIndexClient(state.filter(key, value), signal);
    },
  };
}

export function createStreamHost<
  TStream extends StreamDefinition<StateDefinitions, any, any, any, any>,
>(stream: TStream): StreamHostRuntime<TStream['state'], TStream['events']> {
  return {
    source: stream,
    runtime: {
      async execute(
        name: string,
        state: EventContextStorage<TStream['state']>,
        evt: StoredEvent,
        random: EventRandomSeed,
        env: RuntimeEnv,
        signal: AbortSignal,
      ): Promise<RuntimeResult> {
        try {
          const resultBuilder = hostResultBuilder(stream, evt);

          const ctx: EventHandleContext<any, any, any> = {
            env,
            signal,
            stream: {
              name,
              pattern: stream.pattern,
              allowRead(authId: EventAuth, eventTypes: string[] | string) {
                resultBuilder.addPermissionChange(
                  authId,
                  'grant_event',
                  'read',
                  eventTypes,
                );
              },
              allowReadWrite(authId: EventAuth, eventTypes: string[] | string) {
                resultBuilder.addPermissionChange(
                  authId,
                  'grant_event',
                  'readwrite',
                  eventTypes,
                );
              },
              allowWrite(authId: EventAuth, eventTypes: string[] | string) {
                resultBuilder.addPermissionChange(
                  authId,
                  'grant_event',
                  'write',
                  eventTypes,
                );
              },
              revokeRead(authId: EventAuth, eventTypes: string[] | string) {
                resultBuilder.addPermissionChange(
                  authId,
                  'revoke_event',
                  'read',
                  eventTypes,
                );
              },
              revokeReadWrite(
                authId: EventAuth,
                eventTypes: string[] | string,
              ) {
                resultBuilder.addPermissionChange(
                  authId,
                  'revoke_event',
                  'readwrite',
                  eventTypes,
                );
              },
              revokeWrite(authId: EventAuth, eventTypes: string[] | string) {
                resultBuilder.addPermissionChange(
                  authId,
                  'revoke_event',
                  'write',
                  eventTypes,
                );
              },
            },
            async emit<K extends string>(type: K, data: any): Promise<void> {
              resultBuilder.addOutput(type, data);
            },
            state<K extends string>(
              name: K,
            ): EventHandleStateClient<TStream['state'][K]> {
              return {
                async delete(
                  id: DeleteType<TStream['state'][K]>,
                ): Promise<void> {
                  const { keys } = await state.state(name).delete(id);
                  resultBuilder.addOperation({
                    type: 'delete',
                    state: name,
                    keys,
                  });
                },
                async patch(
                  val: PatchType<TStream['state'][K]>,
                ): Promise<void> {
                  const { data, keys } = await state.state(name).patch(val);
                  resultBuilder.addOperation({
                    type: 'patch',
                    state: name,
                    data,
                    keys,
                  });
                },
                async insert(
                  val: TypeOfStateDeclaration<TStream['state'][K]>,
                ): Promise<void> {
                  const { data, keys } = await state.state(name).insert(val);
                  resultBuilder.addOperation({
                    type: 'insert',
                    state: name,
                    data,
                    keys,
                  });
                },
                async upsert(
                  val: UpsertType<TStream['state'][K]>,
                ): Promise<void> {
                  const { data, keys } = await state.state(name).upsert(val);
                  resultBuilder.addOperation({
                    type: 'upsert',
                    state: name,
                    data,
                    keys,
                  });
                },
              };
            },
            snapshot<K extends string & keyof TStream['state']>(
              name: K,
            ): EventContextSnapshotClient<TStream['state'][K]> {
              return createEventContextStorage(state.snapshot(name), signal);
            },
            random,
            metadata: {
              id: evt.id,
              auth: evt.auth,
              createdAt: evt.createdAt,
              storedAt: evt.storedAt,
              annotations: evt.annotations,
            },
          };

          await stream.handle[evt.event](evt.data, ctx);

          return resultBuilder.get();
        } catch (e) {
          return {
            success: false,
            error: getError(e),
            stacktrace: getErrorStackTrace(e),
          };
        }
      },
    },
  };
}
