package comm

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/nats-io/nats.go"
)

const (
	ConnStateDisconned = 1
	ConnStateReconned  = 2
)

// NATSQueue queue for work
type NatsBus struct {
	mu            sync.Mutex
	nc            *nats.Conn
	reconnChan    chan int
	streamWacther []*NatsStreamWather
	topicWatcher  []*NatsTopicWather

	replyers []*NatsMsgReplyer

	runtimeStartStreamWatcher StartStreamWatcherHandle
}

type StartStreamWatcherHandle func(conf *NatsStreamWather) error

type NatsStreamWather struct {
	Stream             string
	Topic              string
	Queue              string
	AckWaitMinute      int64
	AckWaitMinuteFloat float64
	Entity             CreateEnity
	Cb                 WatcherCallback
	Cb2                WatcherCallback2
	Exported           bool           //在其他部署包内,是否可以被调用
	MaxDeliver         int            // 添加 MaxDeliver 配置
	AckPolicy          nats.AckPolicy // 添加 AckPolicy 配置
}

// 消息回复者
type NatsMsgReplyer struct {
	Subject  string
	Entity   CreateEnity
	Exported bool          //在其他部署包内,是否可以被调用
	Timeout  time.Duration //调用的最大时间
	Cb       ReplyerHandle
	Cb2      ReplyerHandle2
}

type NatsTopicWather struct {
	Topic string
	Cb    WatcherCallback
}

type Stream func(streamName string, topic string, group string)
type ListenTopic func(topic string)
type WatcherCallback func(msg *nats.Msg, entity interface{})
type WatcherCallback2 func(msg *nats.Msg)

type ReplyerHandle func(msg *nats.Msg, entity interface{}) interface{}
type ReplyerHandle2 func(msg *nats.Msg, entity interface{}) (interface{}, error)
type ReplyerHandle3 func(msg *nats.Msg) (interface{}, error)
type CreateEnity func() interface{}

type PackMessageCallback func(entity interface{}) (*RespPackMessage, error)
type RespPackMessage struct {
	CanAck bool
}

// 请求数据
type NatsResponse struct {
	ErrorNo   int
	ErrorDesc string
	Result    string //json字符串
}

type RequestOptions struct {
	Timeout    time.Duration
	PubOpts    []nats.PubOpt
	DeployPack string
}

type ReqPayload struct {
	Payload    interface{}
	DeployPack string
}

type ReqPackApi struct {
	Payload interface{}
	Subject string
	Timeout time.Duration
}

// req params
type ReqStreamApi struct {
	Subject string
	Payload interface{}
	PubOpts []nats.PubOpt
}

type CancelSubscribe func()

type SubscribeCallback func(obj interface{}, msg *nats.Msg) interface{}

type SubOption struct {
	Sub  string
	Obj  func() interface{}
	Call SubscribeCallback
}

func (q *NatsBus) Subscribe(topic string, msg nats.MsgHandler) (*nats.Subscription, error) {
	return q.nc.Subscribe(topic, msg)
}

func (q *NatsBus) SubscribeOnce(Option *SubOption) (CancelSubscribe, error) {
	var sub *nats.Subscription
	var cancel = func() {
		if sub != nil {
			sub.Unsubscribe()
			sub = nil
		}
	}
	s, err := q.nc.Subscribe(Option.Sub, func(msg *nats.Msg) {
		var p interface{}
		if Option.Obj != nil {
			p = Option.Obj()
			err := json.Unmarshal(msg.Data, p)
			if err != nil {
				msg.Term()
				fmt.Println(Option.Sub, " payload parse msg err", err)
				return
			}
		}
		cancel()
		out := Option.Call(p, msg)
		if out != nil {
			payload, _ := json.Marshal(out)
			msg.Respond(payload)
			return
		}
	})
	if err != nil {
		return nil, err
	}
	sub = s
	return cancel, nil
}

func (q *NatsBus) QueueSubscribe(Option *SubOption) (CancelSubscribe, error) {

	var sub *nats.Subscription
	var cancel = func() {
		if sub != nil {
			sub.Unsubscribe()
			sub = nil
		}
	}
	s, err := q.nc.QueueSubscribe(Option.Sub, Option.Sub+".queue", func(msg *nats.Msg) {
		var p interface{}
		if Option.Obj != nil {
			p = Option.Obj()
			err := json.Unmarshal(msg.Data, p)
			if err != nil {
				msg.Term()
				fmt.Println(Option.Sub, " payload parse msg err", err)
				return
			}
		}
		out := Option.Call(p, msg)
		if out != nil {
			payload, _ := json.Marshal(out)
			msg.Respond(payload)
			return
		}
	})
	if err != nil {
		return nil, err
	}
	sub = s
	return cancel, nil
}

func (q *NatsBus) Publish(topic string, data []byte) error {
	return q.nc.Publish(topic, data)
}

func (q *NatsBus) PublishObj(topic string, obj interface{}) error {
	data, _ := json.Marshal(obj)
	return q.nc.Publish(topic, data)
}

func (q *NatsBus) AddReplyers(w ...*NatsMsgReplyer) {
	q.replyers = append(q.replyers, w...)
}
func (q *NatsBus) AddStreamWacher(w ...*NatsStreamWather) {
	q.streamWacther = append(q.streamWacther, w...)
}

// 请求代理api

func (q *NatsBus) RequestPackApi(subject string, Payload interface{}, out interface{}, options *RequestOptions) error {

	pack := "comm"
	timeout := time.Second * 5
	if options != nil {
		if len(options.DeployPack) > 0 {
			pack = options.DeployPack
		}
		if options.Timeout > 0 {
			timeout = options.Timeout
		}
	}
	payload, _ := json.Marshal(&ReqPackApi{Payload: Payload, Subject: subject, Timeout: timeout})
	msg, err := q.nc.Request(fmt.Sprintf("%s.%s", pack, PackPublicApi), payload, timeout)
	if err != nil {
		return err
	}
	result := &NatsResponse{}
	err = json.Unmarshal(msg.Data, result)

	if err != nil {
		return err
	}
	if result.ErrorNo != 200 {
		return errors.New(result.ErrorDesc)
	}

	if out != nil {
		return json.Unmarshal([]byte(result.Result), out)
	}
	return nil
}

// 请求本地api
func (q *NatsBus) RequestApi(subject string, data interface{}, timeout time.Duration, out interface{}) error {
	// Lock so only one goroutine at a time can access the map c.v.
	q.mu.Lock()
	defer q.mu.Unlock()

	payload := []byte{}
	if data != nil {
		payload, _ = json.Marshal(data)
	}
	msg, err := q.nc.Request(subject, payload, timeout)
	if err != nil {
		return err
	}
	result := &NatsResponse{}
	err = json.Unmarshal(msg.Data, result)
	if err != nil {
		return err
	}
	if result.ErrorNo != 200 {
		return errors.New(result.ErrorDesc)
	}
	if out != nil {
		return json.Unmarshal([]byte(result.Result), out)
	}
	return nil
}

func (q *NatsBus) Request(subject string, data interface{}, timeout time.Duration, out interface{}) error {
	payload := []byte{}
	if data != nil {
		payload, _ = json.Marshal(data)
	}
	msg, err := q.nc.Request(subject, payload, timeout)
	if err != nil {
		return err
	}
	return json.Unmarshal(msg.Data, out)
}

// 退出
func (q *NatsBus) Quit() {
	q.nc.Drain()
	q.nc.Close()
}

func NewNatsBus2(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers []*NatsMsgReplyer) (*NatsBus, error) {

	bus, err := NewNatsBus(natsUri, MaxReconnect, ReconnDelaySecond, streamWacther)
	if err != nil {
		return nil, err
	}
	bus.replyers = replyers

	return bus, nil
}

// 可变参数
func NewNatsBus3(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather, replyers ...*NatsMsgReplyer) (*NatsBus, error) {

	bus, err := NewNatsBus(natsUri, MaxReconnect, ReconnDelaySecond, streamWacther)
	if err != nil {
		return nil, err
	}
	bus.replyers = replyers
	return bus, nil
}

func NewNatsBus(natsUri string, MaxReconnect int, ReconnDelaySecond int, streamWacther []*NatsStreamWather) (*NatsBus, error) {

	url := natsUri
	if len(url) < 1 {
		url = nats.DefaultURL
	}

	fmt.Println("conning to nats ====> ", url)

	bus := &NatsBus{
		reconnChan:    make(chan int),
		streamWacther: streamWacther,
		topicWatcher:  []*NatsTopicWather{},
	}
	nc, err := nats.Connect(url,
		// nats.Name("render1"),
		nats.MaxReconnects(MaxReconnect),
		nats.ReconnectWait(time.Duration(ReconnDelaySecond*int(time.Second))),
		nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
			fmt.Printf("%s Got disconnected! Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), err)
			bus.reconnChan <- ConnStateDisconned
		}),
		nats.ReconnectHandler(func(nc *nats.Conn) {
			fmt.Printf("%s Got reconnected to %v!\n", time.Now().Format("2006-01-02 15:04:05"), nc.ConnectedUrl())
			bus.reconnChan <- ConnStateReconned
		}),

		nats.ClosedHandler(func(nc *nats.Conn) {
			tip := fmt.Sprintf("%s Connection closed. Reason: %q\n", time.Now().Format("2006-01-02 15:04:05"), nc.LastError())
			fmt.Println(tip)
			// panic(tip)
		}),
	)
	if err != nil {
		fmt.Println("conn to nats failed ====> ", url, err)
		return nil, err
	}
	bus.nc = nc
	fmt.Printf("%s nats bus Connected: %s\n", time.Now().Format("2006-01-02 15:04:05"), url)

	return bus, nil
}

// 创建jetStream
func (q *NatsBus) JetStream() (nats.JetStreamContext, error) {
	return q.nc.JetStream()
}

// 创建jetStream
func (q *NatsBus) CreateStream(streamName string, topic string) (nats.JetStreamContext, error) {

	js, err := q.nc.JetStream()
	if err != nil {
		return nil, err
	}
	stream, _ := js.StreamInfo(streamName)
	if stream != nil {
		return js, nil
	}

	_, err = js.AddStream(&nats.StreamConfig{
		Name:         streamName,
		Subjects:     []string{topic},
		Retention:    nats.WorkQueuePolicy,
		MaxConsumers: 5,
	})
	if err != nil {
		fmt.Println("bus CreateStream err=>", err.Error())
		return nil, err
	}

	return js, nil
}

// 监听流数据
func (q *NatsBus) WatchStream(ctxTerm, ctxConn context.Context, conf *NatsStreamWather) error {

	streamName := conf.Stream
	topic := conf.Topic
	queue := conf.Queue
	fmt.Println("Watching Stream ", conf.Stream, conf.Topic, conf.Queue)

	//创建对应的jet stream 以便模型创建消息不回丢失
	js, err := q.CreateStream(streamName, topic)
	if err != nil {
		fmt.Println("WatchStream  error=>", err.Error())
		return err
	}

	// 设置消费者选项,包括 MaxDeliver
	consumerConfig := &nats.ConsumerConfig{
		Durable:       queue,
		AckPolicy:     nats.AckExplicitPolicy, // 强制使用 AckExplicitPolicy
		MaxDeliver:    conf.MaxDeliver,
		FilterSubject: topic,
	}

	if conf.AckWaitMinute > 0 {
		consumerConfig.AckWait = time.Duration(conf.AckWaitMinute * int64(time.Minute))
	}

	if conf.AckWaitMinuteFloat > 0 {
		consumerConfig.AckWait = time.Duration(int64(conf.AckWaitMinuteFloat * float64(time.Minute)))
	}

	// 检查消费者是否存在
	consumer, err := js.ConsumerInfo(streamName, queue)
	if err == nil {
		// 消费者存在,检查并更新配置
		needUpdate := consumer.Config.MaxDeliver != conf.MaxDeliver ||
			consumer.Config.AckWait != consumerConfig.AckWait

		if needUpdate {
			_, err = js.UpdateConsumer(streamName, consumerConfig)
			if err != nil {
				fmt.Println("WatchStream UpdateConsumer err ", conf.Stream, conf.Topic, err)
				return err
			}
		}
	} else {
		// 消费者不存在,创建新的
		_, err = js.AddConsumer(streamName, consumerConfig)
		if err != nil {
			fmt.Println("WatchStream AddConsumer err ", conf.Stream, conf.Topic, err)
			return err
		}
	}

	// 使用 pull 订阅模式
	sub, err := js.PullSubscribe(topic, queue, nats.BindStream(streamName))
	if err != nil {
		fmt.Println("WatchStream QueueSubscribeSync err ", conf.Stream, conf.Topic, err)
		return err
	}
	fmt.Println("Watching Stream succ=>", conf.Stream, conf.Topic, conf.Queue)

	var currHandingMsg *nats.Msg = nil
	for {
		select {
		case <-ctxTerm.Done():
			fmt.Println("Watching Stream Termed", streamName, topic)
			if currHandingMsg != nil {
				err := currHandingMsg.Nak()
				fmt.Println("terminate currMsg ", err)
			}
			//中断链接
			q.Quit()
			return nil
		case <-ctxConn.Done():
			fmt.Println("Watching Stream Conn closed", streamName, topic)
			if currHandingMsg != nil {
				err := currHandingMsg.Nak()
				fmt.Println("terminate currMsg ", err)
			}
			return nil

		default:
			currHandingMsg = nil
			msgs, err := sub.Fetch(1)
			if err == nil && len(msgs) > 0 {
				msg := msgs[0]
				currHandingMsg = msg

				meta, _ := msg.Metadata()

				fmt.Println("meta for ", meta.Stream, "#", topic, "#", meta.Consumer, "#")
				fmt.Println("meta NumDelivered", meta.NumDelivered, "NumPending ", meta.NumPending)
				fmt.Println("meta Sequence.Consumer", meta.Sequence.Consumer, "Sequence.Stream ", meta.Sequence.Stream)

				if conf.Cb2 != nil {
					conf.Cb2(msg)
				} else if conf.Cb != nil {
					var req interface{}
					if conf.Entity != nil {
						req = conf.Entity()
					}
					err = json.Unmarshal(msg.Data, req)
					if err != nil {
						msg.Term()
						fmt.Println("work msg err=>", err.Error())
						continue
					}
					conf.Cb(msg, req)
				} else {
					fmt.Println("error no stream wather callback!!!!")
				}
				currHandingMsg = nil
			}
		}
	}
}

// 监听流数据
func (q *NatsBus) WatchStream1(ctxTerm, ctxConn context.Context, conf *NatsStreamWather) error {

	streamName := conf.Stream
	topic := conf.Topic
	queue := conf.Queue
	fmt.Println("Watching Stream ", conf.Stream, conf.Topic, conf.Queue)

	//创建对应的jet stream 以便模型创建消息不回丢失
	js, err := q.CreateStream(streamName, topic)
	if err != nil {
		fmt.Println("WatchStream  error=>", err.Error())
		return err
	}

	opts := []nats.SubOpt{nats.BindStream(streamName), nats.MaxDeliver(5)} //nats.MaxDeliver(5)

	if conf.AckWaitMinute > 0 {
		opts = append(opts, nats.AckWait(time.Duration(conf.AckWaitMinute*int64(time.Minute))))
	}

	if conf.AckWaitMinuteFloat > 0 {
		opts = append(opts, nats.AckWait(time.Duration(int64(conf.AckWaitMinuteFloat*float64(time.Minute)))))
	}

	sub, err := js.PullSubscribe(topic, queue, opts...)
	if err != nil {
		fmt.Println("WatchStream QueueSubscribeSync err ", conf.Stream, conf.Topic, err)
		return err
	}
	fmt.Println("Watching Stream succ=>", conf.Stream, conf.Topic, conf.Queue)

	var currHandingMsg *nats.Msg = nil
	for {
		select {
		case <-ctxTerm.Done():
			fmt.Println("Watching Stream Termed", streamName, topic)
			if currHandingMsg != nil {
				err := currHandingMsg.Nak()
				fmt.Println("terminate currMsg ", err)
			}
			//中断链接
			q.Quit()
			return nil
		case <-ctxConn.Done():
			fmt.Println("Watching Stream Conn closed", streamName, topic)
			if currHandingMsg != nil {
				err := currHandingMsg.Nak()
				fmt.Println("terminate currMsg ", err)
			}
			return nil

		default:
			currHandingMsg = nil
			msgs, err := sub.Fetch(1)
			if err == nil && len(msgs) > 0 {
				msg := msgs[0]
				currHandingMsg = msg

				meta, _ := msg.Metadata()

				fmt.Println("meta for ", meta.Stream, "#", topic, "#", meta.Consumer, "#")
				fmt.Println("meta NumDelivered", meta.NumDelivered, "NumPending ", meta.NumPending)
				fmt.Println("meta Sequence.Consumer", meta.Sequence.Consumer, "Sequence.Stream ", meta.Sequence.Stream)

				if conf.Cb2 != nil {
					conf.Cb2(msg)
				} else if conf.Cb != nil {
					var req interface{}
					if conf.Entity != nil {
						req = conf.Entity()
					}
					err = json.Unmarshal(msg.Data, req)
					if err != nil {
						msg.Term()
						fmt.Println("work msg err=>", err.Error())
						continue
					}
					conf.Cb(msg, req)
				} else {
					fmt.Println("error no stream wather callback!!!!")
				}
				currHandingMsg = nil
			}
		}
	}
}
func (q *NatsBus) GetNatsConn() *nats.Conn {
	return q.nc
}

func (q *NatsBus) Run(cb func()) {
	//ctx, cancel := context.WithCancel(context.Background())
	//ctx, stop := context.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
	ctxTerm, cancelTerm := context.WithCancel(context.Background())
	ctxConn, cancelCnn := context.WithCancel(context.Background())

	defer cancelTerm()
	defer cancelCnn()

	go func() {
		exit := make(chan os.Signal, 1)
		signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
		<-exit
		cancelTerm()
		time.Sleep(1 * time.Second)
		os.Exit(1)
	}()

	var QueueSubscribe = func(rplyer *NatsMsgReplyer) {
		r1 := rplyer
		queue := fmt.Sprintf("%s.queue", r1.Subject)
		q.nc.QueueSubscribe(r1.Subject, queue, func(msg *nats.Msg) {
			defer func() {
				if err := recover(); err != nil {
					fmt.Println("catch error ", err)
				}
			}()

			fmt.Println("replay for ", r1.Subject)
			var req interface{}
			if r1.Entity != nil {
				req = r1.Entity()
				err := json.Unmarshal(msg.Data, req)
				if err != nil {
					msg.Term()
					fmt.Println("rplyer work msg err", err.Error())
					return
				}
			}

			if r1.Cb != nil {
				out := r1.Cb(msg, req)
				if out != nil {
					payload, err := json.Marshal(out)
					if err != nil {
						fmt.Println("error reply for", r1.Subject, err)
					}
					err = msg.Respond(payload)
					if err != nil {
						fmt.Println("error reply for", r1.Subject, out)
					}
				}
				return
			}

			if r1.Cb2 != nil {
				out, err := r1.Cb2(msg, req)
				resp := &NatsResponse{ErrorNo: 200}
				if err != nil {
					fmt.Println("error reply2 for", r1.Subject, err)
					resp.ErrorDesc = err.Error()
					resp.ErrorNo = 400
				} else {
					payload, _ := json.Marshal(out)
					resp.Result = string(payload)
				}
				payload, _ := json.Marshal(resp)
				err = msg.Respond(payload)
				if err != nil {
					fmt.Println("error reply2 for", r1.Subject, out)
				}
				return
			}
		})
	}

	var startSubscribe = func() {
		//启动所有watcher
		for _, item := range q.streamWacther {
			go q.WatchStream(ctxTerm, ctxConn, item)
		}

		for _, rplyer := range q.replyers {
			QueueSubscribe(rplyer)
		}
	}

	q.runtimeStartStreamWatcher = func(conf *NatsStreamWather) error {
		go q.WatchStream(ctxTerm, ctxConn, conf)
		return nil
	}

	startSubscribe()
	if cb != nil {
		cb()
	}
	for {
		select {
		case state := <-q.reconnChan:
			if state == ConnStateDisconned {
				cancelCnn()
			} else if state == ConnStateReconned {
				ctxConn, cancelCnn = context.WithCancel(context.Background())

				startSubscribe()
			}
		}
	}
}