返回顶部
首页 > 资讯 > 后端开发 > Python >python怎么结合shell自动创建kafka的连接器
  • 587
分享到

python怎么结合shell自动创建kafka的连接器

2023-06-30 11:06:09 587人浏览 独家记忆

Python 官方文档:入门教程 => 点击学习

摘要

这篇“python怎么结合shell自动创建kafka的连接器”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Python怎么

这篇“python怎么结合shell自动创建kafka的连接器”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Python怎么结合shell自动创建kafka的连接器”文章吧。

环境

cat /etc/redhat-release Centos linux release 7.5.1804 (Core) [root@localhost ~]# uname -aLinux localhost.localdomain 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linuxpython -VPython 2.7.5

安装连接oracle的python包

pip install cx_Oracle==7.3

获取oracle表信息

cat query_oracle.py #!/usr/bin/env pythonimport cx_Oracleimport sysimport osimport csvimport tracebackfile = open("oracle.txt", 'w').close()user = "test"passwd = "test"listener = '10.0.2.15:1521/orcl'conn = cx_Oracle.connect(user, passwd, listener)cursor = conn.cursor()sql = "select table_name from user_tables" cursor.execute(sql)LIST1=[]while True:    row = cursor.fetchone()    if row == None:        break    for table in row:        #print table        LIST1.append(table)LIST2=[]for i in LIST1:    sql3 = "select COLUMN_NAME,DATA_TYPE,DATA_PRECISioN,DATA_SCALE from cols WHERE TABLE_name=upper('%s')" %i    cursor.execute(sql3)    cursor.execute(sql3)    row3 = cursor.fetchall()    for data in row3:        #LIST2.append(i)        LIST2.extend(list(data))    LIST2.append(i)    f=open('oracle.txt','a+')    print >> f,LIST2    LIST2=[]#f=open('test.txt','a+')#select table_name,column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student'); #select column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student');

去掉多余部分

cat auto.sh #!/bin/bash#python query_oracle.py |tr "," ' '|tr "'" ' '|tr "[" " "|tr "]" " "#>oracle.txt>oracle_tables.txtcat oracle.txt |tr "[],'" " "|sed "s#[ ][ ]*# #g"|sed 's/^[ \t]*//g' >> oracle_tables.txt
cat oracle_tables.txt SNO NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT DATE_DATE SNO2 NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT2 INPUT_TIMESNO3 NUMBER 19 2 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT3 DATA_DATE

shell 脚本处理表信息文件

cat connect.sh #!/bin/bash#获取临时文件的行数FILE_NUM=$(cat oracle_tables.txt |egrep -v '#|^$'|wc -l)#清空自动创建连接器的脚本>create-connect.sh#循环临时文件每一行for i in `seq $FILE_NUM`do     FILE_LINE=$(sed -n ${i}p oracle_tables.txt)    TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF-1)}')    COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}')    REAL_COL_NUM=`expr $COL_NUM - 2`    #清空临时文件    >${TABLE_NAME}.txt    >${TABLE_NAME}.sql    #循环临时文件每行列名所在的列    for j in `seq 1 4 $REAL_COL_NUM`    do        k=`expr $j + 1`        m=`expr $j + 2`        n=`expr $j + 3`        COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j})        COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k})        COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m})        COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n})        #判断列的数据类型是否是NUMBER        if [ "$COL_DATA_TYPE" = "NUMBER" ]        then        #循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中            echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ${TABLE_NAME}.txt        else        #循环拼接SQL查询中的列名部分,追加到临时文件中            echo "$COL_NAME" >> ${TABLE_NAME}.txt        fi    done    #拼接完整的SQL语句,追加到临时文件中    echo "select $(cat ${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from $TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)<trunc(sysdate-1)" >> ${TABLE_NAME}.sql#循环追加每个表对应的连接器到自动创建连接器的脚本中cat >> create-connect.sh << EOFcurl -X POST Http://localhost:8083/connectors -H "Content-Type: application/JSON" -d '{"name": "jdbc_source_$TABLE_NAME","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:orcl","connection.user": "{{ ORACLE_USER }}","connection.passWord": "{{ ORACLE_PASSWD }}","topic.prefix": "YC_$TABLE_NAME","mode": "{{ CONNECT_MODE }}","query": "$(cat ${TABLE_NAME}.sql)"}}' >/dev/null 2>&1EOFdone

说明:脚本中{{ 变量名 }}部分的内容是获取ansible中的变量,这个脚本是和ansible结合使用的。

增强版处理表信息脚本

#!/bin/bash#获取临时文件的行数FILE_NUM=$(cat oracle_time_tables.txt |egrep -v '#|^$'|wc -l)#清空创建连接器的脚本并追加echos函数> create-jdbc-connect.shcat >> create-jdbc-connect.sh << EOF#!/bin/bashechos(){case \$1 inred)    echo -e "\033[31m \$2 \033[0m";;green)  echo -e "\033[32m \$2 \033[0m";;yellow) echo -e "\033[33m \$2 \033[0m";;blue)   echo -e "\033[34m \$2 \033[0m";;purple) echo -e "\033[35m \$2 \033[0m";;*)      echo "\$2";;esac}EOF> create-jdbc-connect-time.shcat >> create-jdbc-connect-time.sh << EOF#!/bin/bashechos(){case \$1 inred)    echo -e "\033[31m \$2 \033[0m";;green)  echo -e "\033[32m \$2 \033[0m";;yellow) echo -e "\033[33m \$2 \033[0m";;blue)   echo -e "\033[34m \$2 \033[0m";;purple) echo -e "\033[35m \$2 \033[0m";;*)      echo "\$2";;esac}EOF#创建表相关文件目录mkdir -p ./TABLE_TIME#循环临时文件每一行for i in `seq $FILE_NUM`do     FILE_LINE=$(sed -n ${i}p oracle_time_tables.txt)    TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF)}')    COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}')    REAL_COL_NUM=`expr $COL_NUM - 2`    #清空临时文件    >./TABLE_TIME/${TABLE_NAME}_time.txt    >./TABLE_TIME/${TABLE_NAME}_time.sql    >./TABLE_TIME/${TABLE_NAME}.sql    #循环临时文件每行列名所在的列    for j in `seq 1 4 $REAL_COL_NUM`    do        k=`expr $j + 1`        m=`expr $j + 2`        n=`expr $j + 3`        COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j})        COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k})        COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m})        COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n})        #判断列的数据类型是否是NUMBER        if [ "$COL_DATA_TYPE" = "NUMBER" ]        then        #循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中            echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt        else        #循环拼接SQL查询中的列名部分,追加到临时文件中            echo "$COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt        fi        #判断是否存在hosts中定义的时间列,如果有就追加该列名进一个临时文件中        TIME_COL=({{ TABLE_TIME_COL }})        for TIME in ${TIME_COL[@]}        do            if [ "$COL_NAME" = "$TIME" ]            then                echo "$COL_NAME" > ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt            fi        done    done    #拼接完整的SQL语句,追加到临时文件中    if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ]    then    #echo "select $(cat ./TABLE_TIME/${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)<trunc(sysdate-1)" >> ./TABLE_TIME/${TABLE_NAME}_time.sql        echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)>=trunc(sysdate-2) and $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)<trunc(sysdate-1)" >> ./TABLE_TIME/${TABLE_NAME}_time.sql    else        echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME" >> ./TABLE_TIME/${TABLE_NAME}.sql    fi#循环追加每个表对应的连接器到自动创建连接器的脚本中if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ]thencat >> create-jdbc-connect-time.sh << EOF#创建表 $TABLE_NAME 连接器的命令如下curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "jdbc_time_$TABLE_NAME","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}","connection.user": "{{ ORACLE_USER }}","connection.password": "{{ ORACLE_PASSWD }}","topic.prefix": "YC_${TABLE_NAME}_INSERT","poll.interval.ms": "86400000","mode": "{{ CONNECT_MODE }}","numeric.mapping": "best_fit","query": "$(cat ./TABLE_TIME/${TABLE_NAME}_time.sql)"}}' >/dev/null 2>&1#判断连接器是否创建成功if [ \$? -eq 0 ]then    echos green "\$(date +"%F %H:%M:%S") 创建jdbc_time_${TABLE_NAME} 连接器成功"else    echos red "\$(date +"%F %H:%M:%S") 创建jdbc_time_${TABLE_NAME} 连接器失败"fiEOFelsecat >> create-jdbc-connect.sh << EOF#创建表 $TABLE_NAME 连接器的命令如下curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "jdbc_$TABLE_NAME","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}","connection.user": "{{ ORACLE_USER }}","connection.password": "{{ ORACLE_PASSWD }}","topic.prefix": "YC_${TABLE_NAME}_INSERT","poll.interval.ms": "86400000","mode": "{{ CONNECT_MODE }}","numeric.mapping": "best_fit","query": "$(cat ./TABLE_TIME/${TABLE_NAME}.sql)"}}' >/dev/null 2>&1#判断连接器是否创建成功if [ \$? -eq 0 ]then    echos green "\$(date +"%F %H:%M:%S") 创建jdbc_${TABLE_NAME} 连接器成功"else    echos red "\$(date +"%F %H:%M:%S") 创建jdbc_${TABLE_NAME} 连接器失败"fiEOFfidone

以上就是关于“python怎么结合shell自动创建kafka的连接器”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网Python频道。

--结束END--

本文标题: python怎么结合shell自动创建kafka的连接器

本文链接: https://lsjlt.com/news/328455.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • python怎么结合shell自动创建kafka的连接器
    这篇“python怎么结合shell自动创建kafka的连接器”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“python怎么...
    99+
    2023-06-30
  • python结合shell自动创建kafka的连接器实战教程
    目录环境安装连接oracle的python包获取oracle表信息去掉多余部分shell 脚本处理表信息文件增强版处理表信息脚本环境 cat /etc/redhat-release ...
    99+
    2024-04-02
  • Python中怎么创建mysql数据库连接池
    这篇文章给大家介绍Python中怎么创建mysql数据库连接池,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。安装为顶层模块 如果你打算在除了Webware之外的程序中使用,推荐安装为顶层模块:python s...
    99+
    2023-06-17
  • Python与Bash的完美结合:如何在Linux上创建接口?
    在Linux系统下,Bash是一种非常流行的命令行解释器,而Python则是一种强大的脚本语言。这两种语言各有所长,但是它们的结合可以让我们在Linux系统上创建非常强大的接口。在本文中,我们将介绍如何使用Python和Bash创建一个简...
    99+
    2023-07-09
    bash linux 接口
  • 自己建云服务器怎么建立连接
    建立连接的方法有很多,下面是一些基本的步骤: 确认您的服务器是使用公共云或私有云来存储数据的,因为云服务器的本质上是一个大型存储设备,可以为许多人提供服务,因此您需要确保您的服务器使用公共云存储的选项。 连接之前,您需要使用网络连接来确...
    99+
    2023-10-26
    服务器
  • python接口自动化框架怎么搭建
    要搭建Python接口自动化框架,可以按照以下步骤进行: 确定需要使用的Python库:一般情况下,需要使用requests库来...
    99+
    2023-10-23
    python
  • 企业自建云服务器怎么建立连接
    首先,在选择云服务器提供商时,需要注意一些重要的因素,如服务器的稳定性和安全性等。企业需要选择具有良好口碑和安全性能的云服务器提供商,以确保服务器的稳定性和安全性。一些大型云服务商,如AWS、阿里云等,具有丰富的经验和强大的技术能力,能够提...
    99+
    2023-10-27
    服务器 企业
  • 怎么连接云服务器自己搭建的mysql
    要连接到云服务器上自己搭建的MySQL数据库,需要进行以下步骤:1. 确保云服务器上已经安装了MySQL数据库,并已经正确配置了相关...
    99+
    2023-09-21
    云服务器 mysql
  • 自己建云服务器怎么建立连接方式
    连接协议:选择一个支持多种协议的云服务器提供商。比如,亚马逊AWS,谷歌云,微软Azure等。这些云服务提供商都提供了不同的连接选项和连接选项协议。 选择服务:根据需要,选择所需的云服务器提供商的服务。比如,您需要一个数据库存储服务,可以...
    99+
    2023-10-28
    方式 服务器
  • 如何解决spring websocket自动断开连接再创建引发的问题
    这篇文章主要介绍如何解决spring websocket自动断开连接再创建引发的问题,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!问题:由于 web session 超时时间为 30...
    99+
    2024-04-02
  • Python怎么创建属于自己的IP池
    这篇“Python怎么创建属于自己的IP池”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Python怎么创建属于自己的IP池...
    99+
    2023-06-30
  • Python自动化办公之Word文档怎么创建与生成
    这篇文章主要介绍了Python自动化办公之Word文档怎么创建与生成的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Python自动化办公之Word文档怎么创建与生成文章都会有所收获,下面我们一起来看看吧。保存生...
    99+
    2023-06-30
  • 基于Python的shell自动化框架ShutIt怎么用
    今天就跟大家聊聊有关基于Python的shell自动化框架ShutIt怎么用,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。ShutIt是一个易于使用的基于shell的自动化框架。它对...
    99+
    2023-06-17
  • 自动拨号服务器连接失败怎么解决
    1. 检查网络连接:确保您的计算机与互联网连接正常,可以访问其他网站或服务。2. 检查服务器设置:确保您已正确配置自动拨号服务器的地...
    99+
    2023-06-08
    自动拨号服务器 服务器
  • 腾讯云服务器自动断开连接怎么办
    如果您的腾讯云服务器出现自动断开连接的情况,可以按照以下步骤进行操作: 检查连接设置:确保云服务器的连接设置为可信连接或强制重试。确保连接状态为可用或等待时间较长。 重新启动云服务器:重新启动您的云服务器可以解决这个问题。如果是由于网络...
    99+
    2023-10-26
    腾讯 服务器
  • 怎么连接自己的云服务器
    如果您想连接自己的云服务器,可以按照以下步骤进行操作: 打开互联网浏览器(如Google、Facebook等)或使用云浏览器(如AWS、阿里云等),在浏览器中搜索云服务器的相关信息。例如,您可以搜索“云服务器”或“云服务器提供商”。 在...
    99+
    2023-10-26
    自己的 服务器
  • Python中的迭代器怎么创建
    这篇文章主要介绍“Python中的迭代器怎么创建”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Python中的迭代器怎么创建”文章能帮助大家解决问题。什么是迭代器什么是python迭代器呢? 举一个...
    99+
    2023-07-06
  • SAP MM怎么看自定义移动类型是怎么创建的
    这篇文章主要介绍“SAP MM怎么看自定义移动类型是怎么创建的”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“SAP MM怎么看自定义移动类型是怎么创建的”文章能帮助大家解决问题。【SAP技术】SAP...
    99+
    2023-06-05
  • 腾讯云服务器自动断开连接怎么回事
    如果您的腾讯云服务器出现自动断开连接的情况,您可以按照以下步骤尝试解决问题: 检查您的操作系统版本是否与您的服务器版本相符。在腾讯云服务器的系统设置中,您可能需要更改网络设置,以便您的计算机可以通过网络连接进行连接。 检查您的网络连接硬...
    99+
    2023-10-26
    腾讯 怎么回事 服务器
  • 怎么在Python中利用Telnet实现自动连接检测密码
    这篇文章给大家介绍怎么在Python中利用Telnet实现自动连接检测密码,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。# encoding=utf-8import telnetlibimport&...
    99+
    2023-06-14
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作