Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how can the client catch the exception and re-establish the connection? #137

Open
phker opened this issue Jan 4, 2022 · 0 comments
Open

Comments

@phker
Copy link

phker commented Jan 4, 2022

If an exception occurs on the server side, or the connection port or server restarts, how can the client catch the exception and re-establish the connection?
my code
`

from Model.RecordModel import RecordModel
from Model.ImageVars import ImageVars
import json

import socket

import threading
import time
import traceback
import logging
import rabbitpy

import DAO.Record

from BLL.MemcacheHelper import cache_get, cache_set

from BLL.RedisHelper import redis_client, acquire_lock, q_lock, release_lock

from BLL.Task2.Step1_YuChuLiThread import Step1_YuChuLi_AppendTask

from BLL.Task2.Upload2OSS_Thread import 上传到OSSV3

from DAO.SuanLiFenPei import GetSuanLiFenPeiConfig

from Config import Seting
#from timeline.TaskTimeLine_MySql import InitTimeLine, SaveRecord, TEnd, TStart

from BLL.RabbitMQConsumer import Consumer

from BLL.RabbitMQHelper import MQConnection, SendMQ, getMQConnection,clearMQConnection

def InitGetJobFromRebbitMQ(processMsgFN):
'''
通过RabbitMQ取任务
'''
queuename = 'Q.AIDetect.AIDetect.on.WebAPI.PhotoIsTaken'

    logging.info(Seting.JiQiHao + "正在初始化MQ队列" + queuename) 
    #queuename ='log.E.OtherSys.CMDTakeImage'
    # MQURL = 'amqp://'+str(Seting.RabbitMQ_UserName)+':'+ str(Seting.RabbitMQ_Password)+'@'+str(Seting.RabbitMQ_IP)+':'+str(Seting.RabbitMQ_Port)+'/%2f?heartbeat=50'
    # bh_group = str(bh_group)
    # priority = priority
    # MQConnection = rabbitpy.Connection(MQURL)
    # channel = MQConnection.channel()
  
    while True:
        try: 
            MQConnection = getMQConnection()

            # with rabbitpy.Connection(MQURL) as conn: 
            with MQConnection.channel() as MQchannel: 
                MQchannel.prefetch_count(1, False) # 每次只收一条消息
                exchange = rabbitpy.Exchange(MQchannel,  Seting.RabbitMQ_ExchangeName, exchange_type='topic', durable=True)
                exchange.declare()
                # 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去  
                mqqueue = rabbitpy.Queue(MQchannel, queuename)
                mqqueue.durable = True
                mqqueue.declare()

                # 一个队列绑定多个Routingkey
                # mqqueue.bind(exchange, "E.WebAPI.PhotoIsTaken."+ bh_group , args)
                # configs = GetSuanLiFenPeiConfig(Seting.JiQiHao) 
                # for cfg in configs:
                #     bh_group = str(cfg["bh_group"])
                #     priority = int(cfg["priority"])
                #     logging.info(Seting.JiQiHao + "正在初始化MQ线程" +  bh_group) 
                #     args = {"x-priority":priority}
                #     mqqueue.bind(exchange, "CMD.AILoop.AIScore." + bh_group , args)
                
                # 优先处理本机器的图片数据
                args = {"x-priority":999}
                mqqueue.bind(exchange, "CMD.AILoop.AIScore." +  Seting.JiQiHao ,args)
                mqqueue.bind(exchange, "E.WebAPI.PhotoIsTaken."+  Seting.JiQiHao,args)

                # 也可以处理其它本机器的图片数据, 因为图片已经上传到OSS, 打分的时候会自动下载
                args = {"x-priority":1}
                mqqueue.bind(exchange, "CMD.AILoop.AIScore.#",args)
                mqqueue.bind(exchange, "E.WebAPI.PhotoIsTaken.#" ,args)
                LoopMessage(mqqueue,processMsgFN)
                    
        except Exception as exx: 
            # MQConnection.closed = 1 # 防止MQchannel.close() 陷入死循环 
            # MQchannel = None
            # MQConnection = None
            # del MQConnection
            clearMQConnection()
            logging.error(traceback.format_exc())  

def LoopMessage(mqqueue:rabbitpy.Queue, processMsgFN):

# Exit on CTRL-C
try: 
    logging.info(mqqueue.name + '与 RabbitMQ 连接成功,等待消息中。。。') 
    for message in mqqueue:  
        try:
            strmsg = str(message.body,encoding='UTF-8')
            logging.debug("MQ收到消息"+ strmsg)
        except Exception as ex:
            logging.error("MQ收到消息字符串编码异常"+ message.body)
            strmsg = str(message.body,encoding='GBK') 

        try:
            res =  processMsgFN(strmsg)
            if(res == "reject"):
                logging.error("MQ reject"+strmsg)
                message.reject()
                # message.ack()
            else: 
                message.ack()
        except Exception as ex: 
            logging.error("MQ reject"+strmsg)
            logging.error(traceback.format_exc())
            # message.reject()
            message.ack()
    
    
except KeyboardInterrupt:
    logging.info('Exited consumer')
    return
except Exception as mqexxx:
    logging.error('MQ ERROR')
    logging.error(traceback.format_exc())
    time.sleep(1)  
    return # 跳出for 循环 message.

`

BLL/RabbitMQHelper.py
`
import time
import traceback
from Config import Seting
import logging

import rabbitpy

URL = 'amqp://'+str(Seting.RabbitMQ_UserName)+':'+ str(Seting.RabbitMQ_Password)+'@'+str(Seting.RabbitMQ_IP)+':'+str(Seting.RabbitMQ_Port)+'/%2f?heartbeat=60'
MQConnection = None
MQChannel = None

def clearMQConnection():
global MQConnection,MQChannel

# if(not MQConnection  is None):
#    MQConnection.close()

MQConnection = None
MQChannel = None

def getMQConnection():
global MQConnection
if(MQConnection is None or MQConnection.closed):
MQConnection = rabbitpy.Connection(URL)

return MQConnection

`

I found two bugs,

One is that when the server is restarted,

for message in mqqueue:

 ...

  Break # will fall into an endless loop waiting for the server to return the close response message

The other is that when the server has just started successfully,

with MQConnection. channel() as MQchannel:

     ....

    When the channel is opened, it will fall into infinite waiting
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant