RabbitMQ + Spring Boot + Python的使用过程

需求:后端执行Pytorch框架下的模型,对输入图像的评估,得到一个分数。

首先,实现Java和Python的交互,参考以下资料:

spring boot 项目实现调用python工程的方法_springboot中可以用python吗-CSDN博客

有五种方法:

  1. 执行 Python 脚本:以终端cmd的方式运行.py文件。
  2. Jython:一个 Python 的 Java 实现。(只支持 Python 2.x)
  3. 使用 Web 服务:将 Python 脚本或应用封装为一个 Web 服务,然后通过 HTTP 请求进行交互。
  4. 使用消息队列:实现 Java 和 Python 之间进行异步通信。优点:支持高并发,解耦合。
  5. 使用 gRPC 或 Thrift :使用 gRPC 或 Apache Thrift 进行跨语言的 RPC(远程过程调用)。

尝试一:使用 Runtime 执行 Python 脚本

注:ProcessBuilder 不支持第三方库的 Python 脚本运行。

以下为简单实现的例子,没有考虑执行失败情况。

 public double getModel(String imageDir) throws Exception {
        Process process = Runtime.getRuntime().exec(
            "绝对地址\\envs\\VEnet\\python.exe 绝对地址\\model.py 绝对地址\\example.jpg"
        );
        BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream(), "GBK"));
        String line = null;
        Double score = 0.0; // 记录得分
        while ((line = in.readLine()) != null) {
            System.out.println(line);
            score = Double.parseDouble(line);
        }
        in.close();
        int re = process.waitFor(); // re表示Python执行的结果
        return score;

其中,re 用来让主线程等待子线程进行完毕。最终 re 为0或者1,表示子线程是否执行成功。我使用的是conda的虚拟环境,所以用了虚拟环境的python.exe所在的绝对路径。

此外,还可以执行 conda activate 虚拟环境名 && python ...\Model.py ...\example.jpg

此外,应该也可以把Python默认的系统路径,从base环境改成虚拟环境,从而直接执行 python ...\Model.py ...\example.jpg。(未尝试)

然而,这种方法不具备高并发的性能,每次请求都需要配置Python环境(Pytorch)、下载模型,耗费了很多没必要的资源。且耦合度高。所以考虑另外高并发的方法。

尝试二:使用消息队列

1.选择RabbitMQ

主要有四种实现方式,RabbitMQ、Apache Kafka和ActiveMQ,资料是:

一文讲清RabbitMQ、Apache Kafka、ActiveMQ_activemq kafaka-CSDN博客

消息中间件(MQ)对比:RabbitMQ、Kafka、ActiveMQ 和 RocketMQ_mq对比-CSDN博客

  1. 信息传递模式
    1. RabbitMQ和ActiveMQ使用传统消息模型,非常适合需要严格排序和可靠交付消息的应用程序。
    2. Kafka使用发布/订阅消息模型,更适合流数据场景,需要实时处理数据。
  2. 性能
    1. RabbitMQ被设计为可靠的消息系统,这意味着它优先考虑消息传递而不是性能。RabbitMQ可以处理中等消息速率,适用于需要严格排序和可靠传递消息的应用程序。
    2. Kafka被设计为高性能系统,可以处理大量数据并具有低延迟。Kafka通过使用分布式架构和优化顺序I/O来实现这种性能。
    3. ActiveMQ也被设计为高性能系统,可以处理高消息速率。ActiveMQ通过使用异步架构和优化消息批处理来实现这种性能。

开始以为是流数据场景,尝试了Kafka。后来意识到,还需要把模型的输出值返回回来,所以应该是可靠通信。然后说RabbitMQ简单易用,适合初学者,所以果断采用RabbitMQ。

2. 学习

6种消息模型

  1. 基本消息模型:
    1. 1个生产者,1个消费者。
    2. 有消息确认机制(ACK) 
  2. work消息模型:
    1. 和简单队列模式基本一样,不过有一点不同,该模式有多个消费者在监听队列。
    2. 以轮询的方式将消息发给多个消费者确保一条消息只会被一个消费者消费。
    3. 任务分发默认使用的是公平队列调度的原则。
    4. 不需要设置队列和交换机的绑定,因为这个模式会将队列绑定到默认的交换机 。
  3. Publish/subscribe(发布订阅模式):交换器类型是 Fanout
    1. 和上面2种模式默认提供交换机不同的是,该模式需要显示声明交换机
    2. 生产者:声明Exchange,不再声明Queue。 发送消息到Exchange,不再发送到Queue。即,生产者没有将消息直接发送给队列,而是发送给exchange(交换机、转发器)
    3. 交换机:将消息转发给与自己绑定的所有队列,实现一个消息被多个消费者消费。
    4. 消费者监听指定的队列获得消息。每个队列可以有多个消费者监听,同样也是以轮询的机制发给消费者。所以,多个消费端监听同一个队列不会重复消费消息。
    5. 注:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
    6. 这个模式需要设置队列和交换机的绑定。
  4. Routing 路由模型:交换机类型是 Direct
    1. 交换机:接收生产者的消息,然后把消息递交给与路由键(routing key)完全匹配的队列
    2. 队列:可以指定多个路由键(routing key)
    3. 生产者:发送消息时需要声明路由键(routing key)
  5. Topics 通配符模式:交换机类型是 Topics
    1. 消费者:监听队列,需要设置队列名称。
    2. 生产者:发送消息,需要设置交换机、路由键(routing key)
    3. 交换机根据路由键(routing key)来转发消息到指定的队列。
  6. RPC 模型:
    1. 基本概念:Callback queue 回调队列、Correlation id 关联标识

4种交换机

  1. Direct Exchange:定向(路由)。交换机通过路由键(routing key)把消息路由到指定的队列。相当于路由键是队列的名字。发送消息时,需要设置路由键。
  2. Topic Exchange:交换机通过通配符匹配路由键和绑定键的关系。把消息交给符合路由模式(routing pattern) 的队列
  3. Fanout Exchange:交换机把消息广播给路由机绑定的所有队列
  4. Headers Exchange:交换机通过消息的headers属性的键值对(key/value)来确定消息的路由。每个队列有一组键值对。当发送消息时,需要在headers属性中设置一组键值对。如果消息的headers中包含了指定的键值对,则该消息将被路由到该队列中。
    1. x-match= all:当消息的所有键值对与绑定的键值对匹配时,才会将消息路由到绑定的队列。这相当于“与”逻辑。如果绑定中没有任何键值对,则所有消息都会被路由到与该绑定相关联的队列。
    2. x-match= any:当消息中的至少一个键值对与绑定的键值对匹配时,就会将消息路由到绑定的队列。这相当于“或”逻辑。如果绑定中没有任何键值对,则没有消息会被路由到与该绑定相关联的队列。

rabbitmq RPC 交换机 rabbitmq几种交换机_mob64ca14122c74的技术博客_51CTO博客 详细解释了交换机。

消息队列可以解决什么问题呢?

  • 业务解耦:A系统需要耦合B、C、D系统,在消息队列之前可以通过共享数据、接口调用等方式来实现业务,现在可以通过消息中间件进行解耦。

  • 削峰填谷:在互联网经常会出现流量突然飙升的情况,以前很多时候就是通过性能优化、加服务器等方式,可以通过消息中间件缓存相关任务,然后按计划的进行处理。

  • 异步:可以通过消息推送及短信发送进行说明,业务平台并不关注具体消息的发送细则,完全可以通过消息队列的方式,直接下发任务,由任务消费者进行处理。

以上都来这两篇文章: 

RabbitMQ介绍 + python操作 - dongye95 - 博客园 (cnblogs.com) 写的很好,面试前看这个。

Python角度介绍RabbitMQ。

还介绍了高级特性:过期时间、消息确认、持久化、死信队列、延迟队列。

rabbitmq RPC 交换机 rabbitmq几种交换机_mob64ca14122c74的技术博客_51CTO博客

Spring Boot角度介绍RabbitMQ。

还介绍了消息持久化、延迟发送(TTL机制和rabbitmq插件两种方式)、可靠性发送与接收。

RabbitTemplate在Spring中的所有方法:

Rabbittemplate所有方法.简介.-CSDN博客

RabbitMQ在Python中的常见错误:

RabbitMq使用中常见错误小结_pika.exceptions.probableauthenticationerror: conne-CSDN博客

pika库的错误我基本都遇到了,可以加深对代码的理解。

3. 准备工作

3.1 安装和配置RabbitMQ

RabbitMQ安装教程(非常详细)从零基础入门到精通,看完这一篇就够了_rabbitmq安装详细教程-CSDN博客

注:15672端口是图形化界面的,而RabbitMQ服务仍然是在默认端口5672上。如果想用新增用户登入图形化界面,需要给新增的用户添加管理员权限。

此外,这篇文章提到:web管理界面把消息内容序列化了(因为它默认使用的还是jdk的序列化的默认序列化器),所以他介绍了如何把web管理界面的默认序列化器更改为json类型的序列化器。这样,我们在web管理界面看消息会更直观。

3.2 Spring Boot端的准备

在Spring Boot引入maven相关依赖:

SpringBoot学习之路---使用RabbitTemplate操作RabbitMq_rabbittemplate用法-CSDN博客

然后,SpringBoot会自动帮我们注入RabbitTemplate。

在yml文件配置 RabbitMQ 的信息:

rabbitmq:
    username: xxx
    password: xxx
    addresses: 127.0.0.1:5672

3.3 Python端的准备

在Python端直接 conda install pika

3.4 模式的思考

尝试简单模式:Spring发送消息。Python接收消息,处理消息,再发送消息。二者都有监听器。这种做法只适用于单线程。在多线程中,如果只使用一个队列,那么不能保证多线程的数据的准确传输;如果每个线程都创建一个队列,那么会造成资源浪费。

后来查阅资料,发现RPC模式非常合适。

2.RPC模式

Spring端

参考了:

RabbitMQ学习整理————基于RabbitMQ实现RPC_spring mvc rabbit mq 如何事项rpc-CSDN博客

他采用了rabbitTemplate.sendAndReceive方法,该方法有三个参数:第一个是交换机(exchange)的名字,第二个是路由键(我感觉就是队列的意思)的名字,第三个则为消息的内容。(注:RabbitMQ中所有的消息都要先通过交换机,空字符串表示使用默认的交换机)

以下我的Spring Boot端的代码:

@Service
public class GetModelRabbitMQService {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    private static final String s = "image2Model";

    public String sendMessage(String imageDir) {
        // 设置correlationId
        String corrId = UUID.randomUUID().toString();
        MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(corrId).build();
        Message message = new Message(imageDir.getBytes(StandardCharsets.UTF_8), messageProperties);
        System.out.println("Spring要发出咯~~"+message);
        Message response = rabbitTemplate.sendAndReceive("", s, message);
        if(response == null){
            System.out.println("没有收到哟~~~");
            return null;
        }
        else{
            String res = new String(response.getBody(), StandardCharsets.UTF_8);
            System.out.println("Spring收到咯~~"+res);
            return res;
        }
    }
}

其中,客户端在等待回调队列里的数据时,如果有消息出现,它会检查 correlation_id 属性。如果此属性的值与请求匹配,就返回给应用。所以,能从回调队列中得到数据,就说明id一致。

Python端

参考了:

python对RabbitMQ的简单使用_python rabbitmq-CSDN博客

他实现了简单模式、发布订阅模式和RPC模式,我参考了RPC模式,以下是我的代码

if __name__ == '__main__':
    # 连接到RabbitMQ服务器
    user_info = pika.PlainCredentials('xxx', 'xxx')  # 用户名和密码
    connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', user_info))
    channel = connection.channel()

    # 声明持久化队列
    queue_name = 'image2Model'
    channel.queue_declare(queue=queue_name ,durable=True) # durable=True,声明队列是持久化。

    # 清空队列,简单模式的持久化需要,RPC模式不需要
    # channel.queue_purge(queue=queue_name)

    def on_request(ch, method, props, body):
        body = body.decode()
        print('body decode后:', body)
        response = str(Train(body))

        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,  # props.reply_to 把消息发送到用来返回消息的queue
                         properties=pika.BasicProperties(correlation_id=props.correlation_id),
                         body=response,
                         )
        ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_qos(prefetch_count=1)  # 一次处理一个队列

    # auto_ack指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息
    channel.basic_consume(queue=queue_name, on_message_callback=on_request)

    print(' 开始监听. To exit press CTRL+C')
    channel.start_consuming()

交换机默认是持久的,队列中的消息属性默认是持久的,即 properties=pika.BasicProperties(delivery_mode=2),其中,1表示非持久。所以,我采用了都设置为持久的方法,即在声明queue的时候,设置queue为持久的,不然无法运行。以下讲到了具体的持久化操作(需要重启服务):RabbitMQ基础学习_rabbitmq channel.basic_qos-CSDN博客

如果auto_ack设置为True,需要手动给消息发送方回复确认。这样,如果程序没有成功运行,可以返回一个错误信息给到客户端。默认是False,即自动回复,然后从内存或者硬盘中删除。 在简单模式中,如果设置了持久化和手动回复,而没有手动回复(只进行了重新发送其他数据,没有再进行手动回复)。每次Python服务启动,都会把之前队列的消息再重新读取和处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/579054.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

三款数据可视化工具深度解析:Tableau、ECharts与山海鲸可视化

在数字化时代,数据可视化工具成为了企业和个人进行数据分析和决策的重要助手。市面上众多数据可视化工具各具特色,本文将为您介绍三款热门的数据可视化工具,帮助您更好地理解和利用数据。 首先,让我们来认识Tableau。Tableau是一款…

opencv4.8 系列一环境搭搭建

open 运行环境&#xff1a; vs2017 下载地址&#xff1a;https://www.123pan.com/s/cVyRVv-ydPWh.html 一&#xff1a;新建项目 二&#xff1a;核心代码&#xff1a; 在这里插入代码片 #include<opencv2/opencv.hpp>int main(int argc,char** argv) {cv::Mat src cv…

windows服务启动提示‘服务没有响应控制功能’(mysql启动报错)

在安装mysql的时候&#xff0c;在windows服务项启动 或 使用命令net start mysql 时启动是报错&#xff0c;提示 服务没有响应控制功能 发生原因&#xff1a; Windows10 x64 或 更高的操作系统&#xff0c;有些系统缺少一些组件 解决办法&#xff1a; 1、下载最新的 Microsoft …

Mybatis入门-----(1)

Mybaits入门 一、Mybaits框架特点 支持定制化SQL、存储过程、基本路线以及高级映射避免了几乎所有JDBC代码中手动设置参数以及获取结果集支持注解式开发、XML开发 二、开发我第一个MYbatis程序 ①打包方式jar ②引入依赖 mybatis依赖mysql驱动 前面两步的pom.xml文件<?…

如何在自己的网站页面中嵌入一个【悬浮音乐播放器】

如何嵌入【悬浮音乐播放器】 前言正文1.打开网易云网页版2.设置自己想要的高度和宽度看注意事项 3.选择是否为自动播放4.在header.php文件中</head>标签前插入下面代码5.在heard.php 中<body>标签后边增加一个 div层6.复制播放器代码到\<div>标签的里边7.保存…

AD修改元器件的引脚长度

这个地方的两个引脚长度不一样 双击其中的一个引脚。 修改这个位置就好了。

Docker学习(二十五)构建 Arthas 基础镜像

目录 一、简介二、构建基础镜像2.1 下载 Arthas2.2 编写 Dockerfile2.3 构建镜像2.4 创建容器2.5 测试 一、简介 Arthas 是一款由 阿里巴巴 开发的 线上监控诊断工具。通过全局视角实时查看应用负载、内存、GC、线程等信息&#xff0c;能在不修改代码的情况下&#xff0c;对业…

SUPIR图像放大模型介绍与实际测试

✨背景 正如&#xff0c;最顶级的料理只需要最简单的烹饪方法一样&#xff0c;图像放大&#xff0c;是设计领域里边最常面对的一个问题&#xff0c;在AI绘画里边也是很常见的一个课题。虽然现在放大算法、放大模型有很多&#xff0c;但是真的能实现的比较好的&#xff0c;并不…

语义分割——json文件转shp

前言 在用labelme标注遥感图像后会生成json文件&#xff0c;如果我们想要shp文件&#xff0c;下面给出了具体实现流程。 一、依赖配置 import json import geopandas as gpd from shapely.geometry import Polygon from osgeo import gdal import argparse import glob import…

【论文解析】笔触渲染生成 前沿工作梳理

最近的一些工作梳理 2023年 Stroke-based Neural Painting and Stylization with Dynamically Predicted Painting Region 2022年Im2Oil: Stroke-Based Oil Painting Rendering with Linearly Controllable Fineness Via Adaptive Sampling 文章目录 1 Stroke-based Neural P…

【海博】雅思该怎么考?

文章目录 考试类型 考试内容 考试形式 备考资源 考试报名 考试成绩 考试类型 学术类&#xff08;A类&#xff09;适用于&#xff1a;出国留学申请本科&#xff0c;研究生及以上学位&#xff0c;或获得专业资质。学术类考试评估考生的英语水平是否满足进行大学或研究生学习…

【C语言】文件操作(1)

为什么使⽤⽂件&#xff1f; 如果没有⽂件&#xff0c;我们写的程序的数据是存储在电脑的内存中&#xff0c;如果程序退出&#xff0c;内存回收&#xff0c;数据就丢失了&#xff0c;等再次运⾏程序&#xff0c;是看不到上次程序的数据的&#xff0c;如果要将数据进⾏持久化的…

Sylar C++高性能服务器学习记录07 【协程模块-知识储备篇】

早在19年5月就在某站上看到sylar的视频了&#xff0c;一直认为这是一个非常不错的视频&#xff0c;由于本人一直是自学编程&#xff0c;基础不扎实&#xff0c;也没有任何人的督促&#xff0c;没能坚持下去&#xff0c;每每想起倍感惋惜。恰逢互联网寒冬&#xff0c;在家无事&a…

vim 插件01:插件管理神器pathogen

1、pathogen简介 Vim 插件 pathogen 是一款历史比较悠久的 Vim 插件管理器。Pathogen 的主要功能是提供一种模块化的方式来管理和加载 Vim 插件。说人话&#xff1a;vim是一款管理各类插件的插卡&#xff0c;使用它会让插件的安装和使用非常方便。 以下是 Pathogen 的主要特点…

【大模型应用篇5】应对裁员潮,突发奇想,打造“收割offer”智能体.......

前段时间飞书大裁员, 不禁让人感到危机四伏,加上《【大模型应用篇4】普通人构建智能体的工具》之前文章介绍了普通人打造智能体的工具, 这节课就带大家利用字节产品coze构建“程序员智能体”, 方便应对裁员,随时做好找工作的准备.打造一款面试智能体,方便各位程序员面试, 这个智…

错误代码126:加载d3dcompiler_43.dll失败,分享多种解决方法

在正常使用电脑的过程中&#xff0c;当我尝试启动并运行一款心仪的游戏时&#xff0c;系统却突然弹出一个令人困扰的错误提示“错误代码126:加载d3dcompiler_43.dll失败”&#xff0c;它会导致游戏无法正常运行。为了解决这个问题&#xff0c;我经过多次尝试和总结&#xff0c;…

22年全国职业技能大赛——Web Proxy配置(web 代理)

前言&#xff1a;原文在我的博客网站中&#xff0c;持续更新数通、系统方面的知识&#xff0c;欢迎来访&#xff01; 系统服务&#xff08;22年国赛&#xff09;—— web Proxy服务&#xff08;web代理&#xff09;https://myweb.myskillstree.cn/114.html 目录 RouterSrv …

OGG extract进程占据大量虚拟内存导致服务器内存异常增长分析

现象 oracle服务器一节点内存&#xff0c;一个月来持续升高&#xff0c;近一月上涨10%左右。 问题分析 OS内存使用情况 使用内存最大的10个进程如下&#xff0c;PID为279417占用最大的内存。 查询279417&#xff0c;发现是ogg相关进程。 发现ogg的extract进程占用了大量的虚拟内…

软件测试(Web自动化测试)(二)

一.Selenium WebDriver的基本应用 &#xff08;一&#xff09;安装浏览器驱动 1.关闭浏览器的自动更新功能 以Windows7&#xff08;64位&#xff09;操作系统为例&#xff0c;讲解如何关闭Chrome浏览器的自动更新。首先按下快捷键“WinR”&#xff0c;打开运行对话框&#x…

【备战软考(嵌入式系统设计师)】02-计算机指令

指令集 我们计算机要执行程序&#xff0c;本质上是执行一条条的指令&#xff0c;而指令是从指令集中取出的&#xff0c;目前常见的指令集有CISC&#xff08;Complex Instruction Set Computer&#xff0c;复杂指令集&#xff09;和RISC&#xff08;Reduced Instruction Set Co…
最新文章