返回顶部
首页 > 资讯 > 前端开发 > node.js >KOA+egg.js如何集成kafka消息队列
  • 404
分享到

KOA+egg.js如何集成kafka消息队列

2024-04-02 19:04:59 404人浏览 泡泡鱼
摘要

这篇文章主要为大家展示了“KOA+egg.js如何集成kafka消息队列”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“KOA+egg.js如何集成kafka消息

这篇文章主要为大家展示了“KOA+egg.js如何集成kafka消息队列”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“KOA+egg.js如何集成kafka消息队列”这篇文章吧。

Egg.js : 基于KOA2的企业级框架

Kafka:高吞吐量的分布式发布订阅消息系统

本文章将集成egg + kafka + Mysql日志系统例子

系统要求:日志记录,通过kafka进行消息队列控制

思路图:

KOA+egg.js如何集成kafka消息队列

这里消费者和生产者都由日志系统提供

λ.1 环境准备

①Kafka

官网下载kafka后,解压

启动ZooKeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafka server

这里config/server.properties中将num.partitions=5,我们设置5个partitions

bin/kafka-server-start.sh config/server.properties

② egg + mysql

根据脚手架搭建好egg,再多安装kafka-node,egg-mysql

mysql 用户名root 密码123456

λ.2 集成

1、根目录新建app.js,这个文件在每次项目加载时候都会运作

'use strict';
 
const kafka = require('kafka-node');
 
module.exports = app => {
 app.beforeStart(async () => {
 const ctx = app.createAnonymousContext();
 
 const Producer = kafka.Producer;
 const client = new kafka.KafkaClient({ kafkaHost: app.config.kafkaHost });
 const producer = new Producer(client, app.config.producerConfig);
 
 producer.on('error', function(err) {
  console.error('ERROR: [Producer] ' + err);
 });
 
 app.producer = producer;
 
 const consumer = new kafka.Consumer(client, app.config.consumerTopics, {
  autoCommit: false,
 });
 
 consumer.on('message', async function(message) {
  try {
  await ctx.service.log.insert(JSON.parse(message.value));
  consumer.commit(true, (err, data) => {
   console.error('commit:', err, data);
  });
  } catch (error) {
  console.error('ERROR: [GetMessage] ', message, error);
  }
 });
 
 consumer.on('error', function(err) {
  console.error('ERROR: [Consumer] ' + err);
 });
 });
};

上述代码新建了生产者、消费者。

生产者新建后加载进app全局对象。我们将在请求时候生产消息。这里只是先新建实例

消费者获取消息将访问service层的insert方法(数据库插入数据)。

具体参数可以参考kafka-node官方api,往下看会有生产者和消费者的配置参数。

2、controller · log.js

这里获取到了producer,并传往service层

'use strict';
 
const Controller = require('egg').Controller;
 
class LoGController extends Controller {
 
 async notice() {
 const producer = this.ctx.app.producer;
 const Response = new this.ctx.app.Response();
 
 const requestBody = this.ctx.request.body;
 const backInfo = await this.ctx.service.log.send(producer, requestBody);
 this.ctx.body = Response.success(backInfo);
 }
}
 
module.exports = LogController;

3、service · log.js

这里有一个send方法,这里调用了producer.send ,进行生产者生产

insert方法则是数据库插入数据

'use strict';
 
const Service = require('egg').Service;
const uuidv1 = require('uuid/v1');
 
class LogService extends Service {
 async send(producer, params) {
 const payloads = [
  {
  topic: this.ctx.app.config.topic,
  messages: JSON.stringify(params),
  },
 ];
 
 producer.send(payloads, function(err, data) {
  console.log('send : ', data);
 });
 
 return 'success';
 }
 async insert(message) {
 try {
  const logDB = this.ctx.app.mysql.get('log');
  const ip = this.ctx.ip;
 
  const Logs = this.ctx.model.Log.build({
  id: uuidv1(),
  type: message.type || '',
  level: message.level || 0,
  operator: message.operator || '',
  content: message.content || '',
  ip,
  user_agent: message.user_agent || '',
  error_stack: message.error_stack || '',
  url: message.url || '',
  request: message.request || '',
  response: message.response || '',
  created_at: new Date(),
  updated_at: new Date(),
  });
 
  const result = await logDB.insert('logs', Logs.dataValues);
 
  if (result.affectedRows === 1) {
  console.log(`SUCEESS: [Insert ${message.type}]`);
  } else console.error('ERROR: [Insert DB] ', result);
 } catch (error) {
  console.error('ERROR: [Insert] ', message, error);
 }
 }
}
 
module.exports = LogService;

4、config · config.default.js

一些上述代码用到的配置参数具体在这里,注这里开了5个partition。

'use strict';
 
module.exports = appInfo => {
 const config = (exports = {});
 
 const topic = 'logAction_p5';
 
 // add your config here
 config.middleware = [];
 
 config.security = {
 csrf: {
  enable: false,
 },
 };
 
 // mysql database configuration
 config.mysql = {
 clients: {
  basic: {
  host: 'localhost',
  port: '3306',
  user: 'root',
  passWord: '123456',
  database: 'merchants_basic',
  },
  log: {
  host: 'localhost',
  port: '3306',
  user: 'root',
  password: '123456',
  database: 'merchants_log',
  },
 },
 default: {},
 app: true,
 agent: false,
 };
 
 // sequelize config
 config.sequelize = {
 dialect: 'mysql',
 database: 'merchants_log',
 host: 'localhost',
 port: '3306',
 username: 'root',
 password: '123456',
 dialectOptions: {
  requestTimeout: 999999,
 },
 pool: {
  acquire: 999999,
 },
 };
 
 // kafka config
 config.kafkaHost = 'localhost:9092';
 
 config.topic = topic;
 
 config.producerConfig = {
 // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
 partitionerType: 1,
 };
 
 config.consumerTopics = [
 { topic, partition: 0 },
 { topic, partition: 1 },
 { topic, partition: 2 },
 { topic, partition: 3 },
 { topic, partition: 4 },
 ];
 
 return config;
};

5、实体类:

mode · log.js

这里使用了 Sequelize

'use strict';
 
module.exports = app => {
 const { STRING, INTEGER, DATE, TEXT } = app.Sequelize;
 
 const Log = app.model.define('log', {
 
 id: { type: STRING(36), primaryKey: true },
 
 type: STRING(100),
 
 level: INTEGER,
 
 operator: STRING(50),
 
 content: TEXT,
 
 ip: STRING(36),
 
 user_agent: STRING(150),
 
 error_stack: TEXT,
 
 url: STRING(255),
 
 request: TEXT,
 
 response: TEXT,
 
 created_at: DATE,
 
 updated_at: DATE,
 });
 
 return Log;
};

6、测试python脚本:

import requests
 
from multiprocessing import Pool
from threading import Thread
 
from multiprocessing import Process
 
 
def loop():
 t = 1000
 while t:
  url = "Http://localhost:7001/log/notice"
 
  payload = "{\n\t\"type\": \"ERROR\",\n\t\"level\": 1,\n\t\"content\": \"URL send ERROR\",\n\t\"operator\": \"Knove\"\n}"
  headers = {
  'Content-Type': "application/json",
  'Cache-Control': "no-cache"
  }
 
  response = requests.request("POST", url, data=payload, headers=headers)
 
  print(response.text)
 
if __name__ == '__main__':
 for i in range(10):
  t = Thread(target=loop)
  t.start()

7、建表语句:

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
 
-- ----------------------------
-- Table structure for logs
-- ----------------------------
DROP TABLE IF EXISTS `logs`;
CREATE TABLE `logs` (
 `id` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
 `type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '日志类型',
 `level` int(11) NULL DEFAULT NULL COMMENT '优先等级(数字越高,优先级越高)',
 `operator` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '操作人',
 `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '日志信息',
 `ip` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'IP\r\nIP',
 `user_agent` varchar(150) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '当前用户代理信息',
 `error_stack` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '错误堆栈',
 `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '当前URL',
 `request` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '请求对象',
 `response` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '响应对象',
 `created_at` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
 `updated_at` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
 PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;
 
SET FOREIGN_KEY_CHECKS = 1;

以上是“KOA+egg.js如何集成kafka消息队列”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注编程网node.js频道!

--结束END--

本文标题: KOA+egg.js如何集成kafka消息队列

本文链接: https://lsjlt.com/news/75498.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • KOA+egg.js如何集成kafka消息队列
    这篇文章主要为大家展示了“KOA+egg.js如何集成kafka消息队列”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“KOA+egg.js如何集成kafka消息...
    99+
    2024-04-02
  • 消息队列 Kafka
    Kafka Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据实时处理领域 为什么使用消息队列MQ 在高并发环境下,同步请求来不及处理会发生堵塞,从而触发too many conn...
    99+
    2023-10-23
    kafka 分布式
  • kafka之消息队列
    大数据工具 kafka 学习 之前需要先了解队列的相关知识 了解万队列就知道kafka的用处 之后再详细了解kafka的具体知识和操作 ...
    99+
    2021-05-31
    kafka之消息队列
  • 消息队列-kafka消费异常问题
    目录概述重试一定次数(消息丢失)加入到死讯队列(消息不丢失)总结概述 在kafka中,或者是说在任何消息队列中都有个消费顺序的问题。为了保证一个队列顺序消费,当当中一个消息消费异常时...
    99+
    2024-04-02
  • 大数据Kafka:消息队列和Kafka基本介绍
    目录一、什么是消息队列二、消息队列的应用场景异步处理 应用耦合限流削峰消息驱动系统 三、消息队列的两种方式点对点模式发布/订阅模式四、常见的消息队列的产品1) RabbitMQ2) ...
    99+
    2024-04-02
  • Redis中如何实现消息队列和延时消息队列
    这篇文章将为大家详细讲解有关Redis中如何实现消息队列和延时消息队列,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。list的几个命令lpush (left push)由...
    99+
    2024-04-02
  • 关于Kafka消息队列原理的总结
    目录Kafka消息队列原理Kafka的逻辑数据模型Kafka的分发策略Kafka的物理存储模型和查找数据的设计Kafka的持久化策略设计Kafka的节点间的数据一致性策略设计Kafk...
    99+
    2024-04-02
  • Java分布式学习之Kafka消息队列
    目录介绍Kafka核心相关名称kafka集群安装kafka使用kafka文件存储Springboot整合kafka介绍 Apache Kafka 是分布式发布-订阅消息系统,在 ka...
    99+
    2024-04-02
  • 详解SpringBoot集成消息队列的案例应用
    目录背景方案规划统一设计集成Redis消息队列集成ActiveMQ消息队列使用示例背景 最近在对公司开发框架进行优化,框架内涉及到多处入库的日志记录,例如登录日志/操作日志/访问日志...
    99+
    2024-04-02
  • Kafka中消息队列的两种模式讲解
    目录Kafka消息队列的两种模式1、点对点模式 2、发布/订阅模式 Kafka消息队列模型图解Kafka消息队列的两种模式 消息队列包括两种模式,点对点模式(po...
    99+
    2024-04-02
  • 如何监听redis消息队列
    监听redis消息队列的示例:springBoot实现redis发布式监听消息队列,定时器监听不同功能的消息,代码:import org.springframework.context.annotation.Bean;import org....
    99+
    2024-04-02
  • redis消息队列如何延时
    redis消息队列延时的示例:延时队列可通过zset来实现,消息的处理时间作为score,最后通过多线程轮询获取到期的score任务即可,代码:public class DelayQueue {static class TaskItem {...
    99+
    2024-04-02
  • redis如何实现消息队列
    Redis可以实现消息队列的功能,常用的实现方式是使用Redis的List数据结构来存储消息队列中的消息。具体实现步骤如下: 将...
    99+
    2024-04-22
    Redis
  • SpringBoot中的消息队列怎么利用redis进行集成
    SpringBoot中的消息队列怎么利用redis进行集成?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一、pom文件依赖<dependency>   <...
    99+
    2023-05-31
    springboot redis
  • Redis中如何使用消息队列
    这篇文章主要介绍了Redis中如何使用消息队列,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。说到消息队列中间件,我们都会想到RabbitMQ...
    99+
    2024-04-02
  • java如何监听redis消息队列
    java监听redis消息队列的方法:可以使用springBoot实现redis发布式监听消息队列,定时器监听不同功能的消息。java监听redis消息队列的示例:springBoot实现redis发布式监听消息队列,定时器监听不同功能的消...
    99+
    2024-04-02
  • 如何利用redis做消息队列
    利用redis做消息队列的示例:生产者模拟程序,代码:package scheduleTest; import java.util.Random; import java.util.UUID; import redis.clients.je...
    99+
    2024-04-02
  • PHP如何添加MySQL消息队列
    这篇文章主要为大家展示了PHP如何添加MySQL消息队列,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带大家一起来研究并学习一下“PHP如何添加MySQL消息队列”这篇文章吧。消息队列:在消息的传输过程中保存消息的容器。消息...
    99+
    2023-06-06
  • oracle数据如何通过goldengate实时同步到kafka消息队列中
    这篇文章主要介绍oracle数据如何通过goldengate实时同步到kafka消息队列中,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!环境介绍组件版本组件版本描述源端oracleo...
    99+
    2024-04-02
  • 分布式系统中 Golang 函数与消息队列的集成
    在分布式系统中,集成函数和消息队列可实现解耦、可扩展性和弹性,通过使用以下步骤在 golang 中集成:创建 cloud functions 函数。集成消息队列客户端库。处理队列消息。订...
    99+
    2024-04-19
    golang 分布式系统
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作