import {
  StateDefinition,
  StateIndex,
} from '@aion/core/storage/state-declaration.js';
import { ObservedIndexClient } from '@aion/core/storage/observer/observed-index-client.js';
import {
  FilterKey,
  FilterStateIndex,
  FilterValue,
} from '@aion/core/storage/snapshot-index-client.js';
import { BrowseResult, RangeResult } from '@aion/core/storage/browse-result.js';
import { ObserveRangeOptions } from '@aion/core/storage/observer/observe-range-options.js';
import { IterateOptions } from '@aion/core/storage/observer/iterate-options.js';
import { RemoteDescription } from './remote-description.js';
import { RpcClient } from '../api/create-rpc-client.js';
import { SessionManager } from '../auth/jwt-session.js';
import { Lazy } from '@aion/core/management/lazy.js';
import { StateValue } from '@aion/core/storage/state-value.js';
import { mapToState } from '@aion/core/runtime/get-state-data-keys.js';

export function createRemoteObservedIndexClient<
  TState extends StateDefinition<any, any, any>,
  TIndex extends StateIndex<any, any>,
>(
  state: TState,
  index: TIndex,
  keyIndex: number,
  rpcClient: RpcClient,
  session: SessionManager,
  lazyVersion: Lazy<string>,
  type: 'stream' | 'aggregation',
  streamIdentifier: RemoteDescription,
  stateName: string,
  indexName: string,
  filter: Record<string, StateValue>,
): ObservedIndexClient<TState, TIndex> {
  return {
    nextKey: index.fields[keyIndex],
    filter(
      key: FilterKey<TIndex> & string,
      value: FilterValue<TState, TIndex>,
    ): ObservedIndexClient<TState, FilterStateIndex<TIndex>> {
      return createRemoteObservedIndexClient(
        state,
        index,
        keyIndex + 1,
        rpcClient,
        session,
        lazyVersion,
        type,
        streamIdentifier,
        stateName,
        indexName,
        { ...filter, [key]: value },
      );
    },
    async *iterate(
      bookmark: FilterValue<TState, TIndex> | null,
      iterateOptions: IterateOptions,
    ): AsyncGenerator<RangeResult<any>> {
      const response = await rpcClient['state.index.iterate'].call(
        {
          tenant: streamIdentifier.tenant,
          realm: streamIdentifier.realm,
          version: await lazyVersion.resolve(),
          pattern: streamIdentifier.pattern,
          index: indexName,
          state: stateName,
          args: streamIdentifier.args,
          bookmark: bookmark ?? undefined,
          direction: iterateOptions.dir,
          open: iterateOptions.open,
          filter,
          token: await session.resolveToken(),
          type,
          take: undefined,
        },
        iterateOptions.signal,
      );
      yield* response.result();

      for await (const item of response.result()) {
        yield {
          data: mapToState(state, item.data),
          keys: item.keys,
        };
      }
    },
    async *observeRange(
      bookmark: FilterValue<TState, TIndex> | null,
      options: ObserveRangeOptions,
    ): AsyncGenerator<BrowseResult<any>> {
      const response = await rpcClient['state.index.browse.observe'].call(
        {
          tenant: streamIdentifier.tenant,
          realm: streamIdentifier.realm,
          version: await lazyVersion.resolve(),
          pattern: streamIdentifier.pattern,
          state: stateName,
          index: indexName,
          bookmark: bookmark ?? undefined,
          direction: options?.dir,
          take: options?.take,
          filter,
          args: streamIdentifier.args,
          token: await session.resolveToken(),
          type,
          open: options.open,
        },
        options.signal,
      );
      for await (const item of response.result()) {
        yield {
          items: item.items.map((i) => ({
            data: mapToState(state, i.data),
            keys: i.keys,
          })),
          bookmark: item.bookmark,
        };
      }
    },
  };
}
