import { Observable, Subject, share, throwError } from 'rxjs';
import { SignalRHubConnectionState } from './signalr-hub-connection.state';
import { ISignalRHubConnection } from './signalr-hub-connection.interface';

export class SignalRHubConnection implements ISignalRHubConnection {
	private connection: signalR.HubConnection | undefined;
	private startSubject = new Subject<void>();
	private stopSubject = new Subject<void>();
	private stateSubject = new Subject<SignalRHubConnectionState>();
	private errorSubject = new Subject<Error>();

	start$: Observable<void> = this.startSubject.asObservable();
	stop$: Observable<void> = this.stopSubject.asObservable();
	state$: Observable<SignalRHubConnectionState> = this.stateSubject.asObservable();
	error$: Observable<Error> = this.errorSubject.asObservable();

	constructor(
		public hubName: string,
		public url: string,
		private connectionFactory: (url: string) => signalR.HubConnection
	) {}

	start(): Observable<void> {
		const connection = this.ensureConnection();

		connection
			.start()
			.then(() => {
				this.startSubject.next();
				this.stateSubject.next('connected');
			})
			.catch((error) => {
				this.errorSubject.next(error ?? null);
			});

		return this.startSubject.asObservable();
	}

	stop(): Observable<void> {
		if (!this.connection) {
			return throwError(() => new Error('Connection not started'));
		}

		this.connection
			.stop()
			.then(() => {
				this.stopSubject.next();
			})
			.catch((error) => {
				this.errorSubject.next(error);
			});

		return this.stopSubject.asObservable();
	}

	on<T>(eventName: string): Observable<T> {
		return new Observable<T>((observer) => {
			const connection = this.ensureConnection();

			const callback = (data: T) => observer.next(data);

			connection.on(eventName, callback);

			const errorSubscription = this.errorSubject.subscribe(() => {
				observer.error(new Error('The connection has been closed.'));
			});

			const stopSubscription = this.stopSubject.subscribe(() => {
				observer.complete();
			});

			return () => {
				errorSubscription.unsubscribe();
				stopSubscription.unsubscribe();
				connection.off(eventName, callback);
			};
		}).pipe(share());
	}

	private ensureConnection(): signalR.HubConnection {
		if (!this.connection) {
			this.connection = this.connectionFactory(this.url);

			this.connection.onclose((error) => {
				if (error) {
					this.errorSubject.next(error);
				}
				this.stateSubject.next('disconnected');
			});

			this.connection.onreconnecting(() => {
				this.stateSubject.next('reconnecting');
			});

			this.connection.onreconnected(() => {
				this.stateSubject.next('reconnected');
			});
		}

		return this.connection;
	}
}
