返回顶部
首页 > 资讯 > 后端开发 > Python >Kafka系列3-python版本pro
  • 698
分享到

Kafka系列3-python版本pro

版本系列Kafka 2023-01-31 08:01:43 698人浏览 安东尼

Python 官方文档:入门教程 => 点击学习

摘要

直接上代码了: # -*- coding: utf-8 -*- ''' 使用kafka-python 1.3.3模块 ''' import sys import time import JSON from kafka im

直接上代码了:


# -*- coding: utf-8 -*-

'''
    使用kafka-python 1.3.3模块
'''

import sys
import time
import JSON

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError


KAFAKA_HOST = "127.0.0.1"
KAFAKA_PORT = 9092
KAFAKA_TOPIC = "foobar"


class Kafka_producer():
    '''
    生产模块:根据不同的key,区分消息
    '''

    def __init__(self, kafkahost,kafkaport, kafkatopic, key):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.key = key
        self.producer = KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.fORMat(
                                        kafka_host=self.kafkaHost,
                                        kafka_port=self.kafkaPort)
                        )

    def sendjsondata(self, params):
        try:
            parmas_message = json.dumps(params)
            producer = self.producer
            producer.send(self.kafkatopic, key=self.key, value=parmas_message.encode('utf-8'))
            producer.flush()
        except KafkaError as e:
            print e



class Kafka_consumer():
    '''
    消费模块: 通过不同groupid消费topic里面的消息
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.key = key
        self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
                                    bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                                        kafka_host=self.kafkaHost,
                                        kafka_port=self.kafkaPort )
                        )

    def consume_data(self):
        try:
            for message in self.consumer:
                yield message
        except KeyboardInterrupt, e:
            print e


def main(xtype, group, key):
    '''
    测试consumer和producer
    '''
    if xtype == "p":
        # 生产模块
        producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
        print "===========> producer:", producer
        for _id in range(100):
           params = '{"msg" : "%s"}' % str(_id)
           producer.sendjsondata(params)
           time.sleep(1)

    if xtype == 'c':
        # 消费模块
        consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)
        print "===========> consumer:", consumer
        message = consumer.consume_data()
        for msg in message:
            print 'msg---------------->', msg
            print 'key---------------->', msg.key
            print 'offset---------------->', msg.offset



if __name__ == '__main__':
    xtype = sys.argv[1]
    group = sys.argv[2]
    key = sys.argv[3]
    main(xtype, group, key)
    


--结束END--

本文标题: Kafka系列3-python版本pro

本文链接: https://lsjlt.com/news/192725.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Kafka系列3-python版本pro
    直接上代码了: # -*- coding: utf-8 -*- ''' 使用kafka-Python 1.3.3模块 ''' import sys import time import json from kafka im...
    99+
    2023-01-31
    版本 系列 Kafka
  • CentOs7将Python版本从3.x
    删除原来的软连接 [root@localhost bin]# rm -rf /usr/bin/python 建立新的连接 [root@localhost bin]# ln -s /usr/bin/python2.7 /usr/bi...
    99+
    2023-01-31
    版本 Python
  • Python环境版本中怎么安装3.X版本
    本篇内容介绍了“Python环境版本中怎么安装3.X版本”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Python环境版本在与日俱增的发展进...
    99+
    2023-06-17
  • Python 3版本较之前版本语法的一些
    市面上的Python教程基本都是以3.0以下版本来讲解的,python 从3.0之后一些语法都做了写更改,有时候可能会浪费比较多的时间,记录下使用过程中遇到的情况以备后查。 1、Print (1)需要加括号 (2)打印文件重定向 (1)pr...
    99+
    2023-01-31
    版本 语法 Python
  • Python 检测系统时间,k8s版本,redis集群,etcd,mysql,ceph,kafka
    线上有一套k8s集群,部署了很多应用。现在需要对一些基础服务做一些常规检测,比如:系统时间,要求:k8s的每一个节点的时间,差值上下不超过2秒k8s版本,要求:k8s的每一个节点的版本必须一致redis集群,要求:1. 查看cluster ...
    99+
    2023-01-31
    集群 检测系统 版本
  • ORACLE系列脚本3:救命的JOB处理脚本
    背景:数据库出现JOB长期执行不完导致资源耗费过大的情况通过下列预计可以快速定位JOB,快速干预处理,恢复数据库性能。通过下列语句长期运维T以上数据库个,屡试不爽。 找出正在执行的JOB编号及其会话编号 ...
    99+
    2024-04-02
  • Python 2与Python 3版本和编码的对比
    一、版本对比 首先要说的是,Python的版本,目前主要分为两大类: Python 2.x的版本的,被称为Python2:是目前用的最广泛的,比如Python 2.7.3。 Python 3.x的版本的,被...
    99+
    2022-06-04
    版本 Python
  • Anaconda版本与Python版本的对应关系
    Anaconda版本与Python版本的对应关系 Anaconda是一个用于数据科学和机器学习的开源发行版,它提供了许多常用的Python库和工具。Anaconda的不同版本与Python版本有一定的...
    99+
    2023-10-22
    python 开发语言 Python
  • python 学习系列(3) 读取并显示
    python 读取并显示图片的两种方法 在 python 中除了用 opencv,也可以用 matplotlib 和 PIL 这两个库操作图片。本人偏爱 matpoltlib,因为它的语法更像 matlab。 1. 显示图片 imp...
    99+
    2023-01-31
    系列 python
  • CCNA系列课程(3)CDP及设备基本操
    第三节课     CDP及Cisco 设备常规操作 杜飞 2009-06-25        今天主要介绍一些Cisco设备的自动发现协议:CDP,然后再介绍一下设备的基本操作如接口类型,常见命令等。 首先咱们先来看第一个知识点 Cisco...
    99+
    2023-01-31
    课程 系列 设备
  • python笔记之2.x上兼容3.x版本
    在前文《python笔记之3.x与2.x的使用区别》谈及了不同版本的区别问题。长远看软件新版本肯定会取代低版本的,除非你有成熟的老版本代码必须考虑兼容性问题,一般还是推荐新手学习新版本。 最近学习python,主要使用3.3版本,但看代码和...
    99+
    2023-01-31
    版本 笔记 python
  • Anaconda版本和Python版本对应关系(持续更新...)
    简介         Anaconda是包管理工具,是专注于数据分析的Python发行版本,其包含Python和许多常用软件包,不同的Anaconda版本里面也配备了不同的Python版本,并且Python的出现时间比Anaconda早很多...
    99+
    2023-10-07
    python 开发语言 anaconda
  • tensorflow和python版本对应关系
    参考在 Windows 环境中从源代码构建  |  TensorFlow (google.cn) 复制以便收藏 CPU 版本Python 版本编译器构建工具tensorflow-2.6.03.6-3.9MSVC 2019Bazel 3.7....
    99+
    2023-09-15
    tensorflow python 深度学习
  • Pycharm与Python版本对应关系
    Pycharm支持的Python版本 一直用的老版本Pycharm(2018)和Python(3.6),最近需要用到python 3.10,电脑安装后发现配置Pycharm编译器时报[uns...
    99+
    2023-10-01
    python pycharm
  • Centos系统下Python版本升级
    这篇文章是之前写的在centos6下从python2.6升级到2.7的过程,升级到3.0以上版本的过程跟这个一样,懒得改了。在使用Centos6时,由于系统自带的python版本为2.6.6,而2.x版本中当前普遍使用的为2.7版本,所以...
    99+
    2023-01-31
    版本 系统 Centos
  • Ubuntu20.04下更新系统Python版本
    起因:写Python时报错: TypeError: unsupported operand type(s) for |=: ‘dict’ and ‘dict’ 原因:python3.9 支持对 ...
    99+
    2023-10-26
    python linux ubuntu
  • Navicat系列Mac版本安装及使用方法
    今天就跟大家聊聊有关Navicat系列Mac版本安装及使用方法,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。Navicat for MySQL fo...
    99+
    2024-04-02
  • Python和Anaconda的版本对应关系
    原文链接 Python和Anaconda的版本对应关系如下: Packages included in Anaconda 2022.10 for 64-bit Linux on x86_64 CPUs...
    99+
    2023-09-14
    python 开发语言 linux
  • pytorch(torchvision)和python对应版本关系
    pytorch(torchvision)和python对应版本关系 当你在确定好pytorch的版本后,https://pytorch.org/get-started/previous-version...
    99+
    2023-09-10
    python pytorch 深度学习
  • 升级 Linux 系统中的 Python 版本
    升级 Linux 系统中的 Python 版本 Python 是一种非常流行的编程语言,广泛应用于各种领域,包括 Web 开发、数据分析等。而对于 Linux 系统来说,Python 更是一个必须的组...
    99+
    2023-09-05
    linux 运维 服务器 python
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作