import {AMQPChannel, AMQPWebSocketClient} from "@cloudamqp/amqp-client"
import {UUID} from "../adapters/interfaces";
import {NotificationResponse, NotificationResponseFromJSON} from "../generated/models/NotificationResponse";

const AMQP_EXCHANGE_USERS_NOTIFICATIONS = "aios_users_notifications"
export default class Amqp {

    private client: AMQPWebSocketClient
    private channel?: AMQPChannel
    private subscriptions: Map<string, (message: any) => void> = new Map()

    private static INSTANCE: Amqp

    constructor(url: string = "ws://localhost:15670", virtualHost: string, username?: string, password?: string) {
        this.client = new AMQPWebSocketClient(url, virtualHost, username, password)
        this.client.onerror = (error) => {
            console.error("Error in AMQP client", error.message)
        }
    }

    public static getInstance(url: string = "ws://localhost:15670", virtualHost: string, username?: string, password?: string) {
        if (Amqp.INSTANCE) return Amqp.INSTANCE
        return Amqp.INSTANCE = new Amqp(url, virtualHost, username, password)
    }

    public connect(): Promise<void> {
        return this.client.connect()
                .then(connection => connection.channel())
                .then(channel => {
                    this.channel = channel
                })
                .catch(error => console.error("Error connecting to AMQP broker", error))
    }

    public isConnected(): boolean {
        return !this.client.closed && this.channel != null && !this.channel.closed
    }

    public disconnect(): Promise<void> {
        return new Promise((resolve, _) => {
            this.channel?.close("Closed explicitly")
            this.client.close("Closed explicitly")
            resolve()
        })
    }

    public subscribeToNotifications(userId: UUID, onMessageCallback: (notification: NotificationResponse) => void): Promise<void> {
        return new Promise(((resolve, reject) => {
            if (!this.channel) reject("Channel has not been initialized")
            else this.channel!.exchangeDeclare(AMQP_EXCHANGE_USERS_NOTIFICATIONS, "topic", {
                durable: true,
                autoDelete: false,
                internal: false
            }).then(() => this.channel!.queueDeclare(undefined, {
                        durable: false,
                        exclusive: false,
                        autoDelete: false
                    }).then(queue => this.channel!.queueBind(queue.name, AMQP_EXCHANGE_USERS_NOTIFICATIONS, `aios.users.${userId}.notifications`)
                            .then(() => this.channel!.basicConsume(queue.name, {
                                noAck: true // Notification messages do not need an ACK from the client side
                            }, message => {
                                console.trace("Got message: ", message)
                                const payloadAsString = message.bodyToString()
                                console.trace("Got payload: ", payloadAsString)
                                const notification = NotificationResponseFromJSON(JSON.parse(payloadAsString || "{}"))
                                console.debug("Got notification: ", notification)
                                onMessageCallback(notification)
                            }))
                            .then(() => {
                                this.subscriptions.set(`notifications-${userId}`, onMessageCallback)
                                resolve()
                            })
                    )
            )
                    .catch(error => reject(error))
        }))
    }

    public isNotificationsSubscribed(userId: string): boolean {
        return this.subscriptions.has(`notifications-${userId}`)
    }

}
