useful-scripts

时间:2022-07-24
本文章向大家介绍useful-scripts,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

下面分享了几个python脚本,在日常工作中提高效率。

环境介绍

OS:MAC PRO
脚本语言:python3.7
编辑器:vscode/sublimetext3

python使用相对简单,快速便捷,很适合作为脚本开发;作为"资深"的sub3/vscode控,使用编辑器鞋脚本再也适合不过,vscode中的调试功能太好用啦:

生成海量csv数据文件

测试同学为了压测接口,让我帮忙提供不重复的数据,正好用python写了一个简单脚本:

# -*- coding: utf-8 -*-
import requests
import sys
import re
import csv
import random
'''
从csv文件中读取数据
'''
def readCsv():
# 读取csv至字典
  csvFile = open("/Users/lioswong/LiosWong/sublimetext/python/脚本/bindPhone.csv", "r")
  reader = csv.reader(csvFile)
  # 建立空字典
  result = {}
  for item in reader:
      # 忽略第一行
      if reader.line_num == 1:
        continue
      result[item[0]] = item[1]
  csvFile.close()
  print(result)

'''
往csv文件中写入数据
'''
def writerCsv():
  fileHeader = ["customerPhone", "orderNo","driverPhone","driverNo"]
  csvFile = open("/Users/lioswong/LiosWong/sublimetext/python/脚本/test6.csv", "w")
  writer = csv.writer(csvFile)
  d1 = [0]*4
  line = 1
  for i in range(0,1000001):
    if line==1:
        writer.writerow(fileHeader)
    d1[0]=random.choice(['177','156','159','188','199','139','152','188','133','185','170','136','189','158','178','151'])+"".join(random.choice("0123456789") for i in range(8))
    d1[1]="".join(random.choice("0123456789") for i in range(10))
    d1[2]=random.choice(['152','139','199','188','190','185','156','136','133','158','136','151','153'])+"".join(random.choice("0123456789") for i in range(8))
    d1[3]="".join(random.choice("0123456789") for i in range(8))
    # 写入的内容都是以列表的形式传入函数
    writer.writerow(d1)
    print(line)
    line+=1
  csvFile.close()


writerCsv()
# readCsv()

sql查询导出csv文件脚本

由于公司内部有严格的权限控制,sql查询导出需要提工单,流程繁琐,为了方便工作,写了下面脚本,可以支持任意sql的查询导出,只限于工作导出,当然大批量的爬取数据,公司的数据中心同学可能随时查水表:

#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
import logging
import math
import time
import re
import requests
import sys
import csv
from lxml import etree
def exec(csrf_token, sql_content, dbase, dbconfig):
    headers = {
        # cookie需要用最新的,否则会失败
        "cookie": "UM_distinctid=1b396-06251-13c680-1727b86bdbca57; _ga=GA1.22421.1591582910; JSESSIONID=BaxMuUVZRUou-y4XBkUdcdNc_DMkGc4qhwSkOi59AoZhjxfHfhU3WAqr8pCrEQ1omC3IttHRV4bz2j5s1NW-l88_gC9V3A5.EeXDfg.DB2JoXb7DZUDLJcRSpQLALJtq8M",
    }

    url = "https://xxxxx/query/mysql"

    data = {'csrf_token': csrf_token, 'sql_content': sql_content,
            'dbase': dbase, 'dbconfig': dbconfig}

    re = requests.post(url, data=data, headers=headers)

    # 深坑,可能td节点数据为空,需要特殊处理
    new_text = re.text.replace(r'<td style="white-space: pre-wrap;"></td>',
                               '<td style="white-space: pre-wrap;">None</td>')
    html = etree.HTML(new_text)

    thead = html.xpath("//div[@class='x_content sql_result']/table/thead")[0]

    th = thead.xpath("//tr/th/text()")

    filterStr = ['数据库信息', '名称', '内容预览', '是否开放', '备注', '操作']
    fileHeader = []

    for x in th:
        if x not in filterStr:
            fileHeader.append(x)
        pass
    writerToCSV(fileHeader, html)


def writerToCSV(fileHeader, html):
    csvFile = open(
        "/Users/lioswong/LiosWong/sublimetext/Script-languages/python/draft/cc_sql_to_file.csv", "w", encoding='utf-8-sig')
    writer = csv.writer(csvFile)
    writer.writerow(fileHeader)

    hLen = len(fileHeader)
    tdData = html.xpath(
        "//div[@class='x_content sql_result']/table/tbody/tr/td/text()")
    col = 0
    d1 = []
    for i in tdData:
        if col == 0:
            d1 = [0 for i in range(hLen)]
        if col + 1 == hLen:
            d1[col] = i
            writer.writerow(d1)
            col = 0
            pass
        else:
            d1[col] = i
            col += 1
    csvFile.close()

exec('IjdhNjljZGI1OGTJiNWUyODIwZjEyNGI0Njg2YmYi.EeXC9g.egKcK0rKVP0aCMZCqDQvUUU0lRg',"select * from test order by id desc limit 100", 'xxxx', '从库-生产')

kafka Consumer Producer

开发环境、测试环境会遇到手动发送消息、消费消息的需要:

  • kafka Producer
#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
import json
import logging
logging.basicConfig(level=logging.INFO)

client = KafkaClient(hosts="10.0.0.1:9092")  # 可接受多个Client,多个broker

def sendDevKafkaMsg(topic, message):
    try:
        topic = client.topics[topic]  # 选择一个topic
        producer = topic.get_producer(delivery_reports=True)
        producer.produce(bytes(message, encoding="utf8"))
        producer.get_delivery_report()  # 返回之前发送失败的消息和结果
    except Exception as e:
        print(e)

data = {"appVersion":"34900","cityCode":"0532"}
sendDevKafkaMsg("XXX_INFO_CHANGE",json.dumps(data2))
  • kafka Consumer
#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
import _thread
import threading
import json
import logging
# logging.basicConfig(level=logging.INFO)

client = KafkaClient(hosts="10.0.0.1:9092")


def receiveMsg(topics):
    topic = client.topics[topics]  # 选择一个topic
    consumer = topic.get_simple_consumer(consumer_group='dev26-dc',
                                         reset_offset_on_start=False)
    partitions = topic.partitions
    offset_list = consumer.held_offsets
    print("当前消费者分区offset情况{}".format(offset_list))  # 消费者拥有的分区offset的情况
    consumer.reset_offsets([(partitions[0], 0)])  # 设置offset
    msg = consumer.consume()
    print("消费 :{}".format(msg.value.decode()))

while True:
    receiveMsg("DRIVER_SIGNING_TYPE_CHANGE")
    pass

dubbo_telnet自动化测试脚本

由于平时对外提供都是RPC接口,可以用单元测试、invoke调试本地接口,有时本地、开发环境、测试环境为了批量测试、压测RPC接口,该脚本可能会有用:

#!/usr/bin/python
# -*- coding:utf-8 -*-
import json
import telnetlib
import unittest
import time
import re
import datetime


class Dubbo(telnetlib.Telnet):

    prompt = 'dubbo>'
    coding = 'utf-8'

    def __init__(self, host=None, port=0):
        super().__init__(host, port)
        self.write(b'n')

    def command(self, flag, str_=""):
        data = self.read_until(flag.encode())
        self.write(str_.encode() + b"n")
        return data

    def invoke(self, service_name, method_name, args):
        command_str = "invoke {0}.{1}({2})".format(
            service_name, method_name, args)
        self.command(Dubbo.prompt, command_str)
        data = self.command(Dubbo.prompt, "")
        data = json.loads(data.decode(
            Dubbo.coding, errors='ignore').split('n')[0].strip())
        return data


class qqTest(unittest.TestCase):
    #setUp 用于设置初始化的部分,在测试用例执行前,这个方法中的函数将先被调用 #
    def setUp(self):
        '''
        dev: Dubbo('120.26.98.15', 6666)
        '''
        self.dubbo_conn = Dubbo('127.0.0.1', 6666)
        self.verificationErrors = []  # 脚本运行时,错误的信息将被打印到这个列表中#
        self.accept_next_alert = True  # 是否继续接受下一个警告#

    # 司机开始服务
    def test_start_service(self):
        data = {"class": "com.xxxxx.qq.api.dto.input.DriverStartServiceParam", "driverNo": 019,"lat": 30.206907583333, "lng": 120.22090913, "orderLabel": 128, "orderNo": 541, "bizType": 1}
        result = self.dubbo_conn.invoke(
            "com.xxxxx.qq.api.app.response.DriverOrderResponseApi", "startService", data)
        print(result)
        time.sleep(2)
    def tearDown(self):
        '''
        tearDown 方法在每个测试方法执行后调用,这个地方做所有清理工作,如退出浏览器等。
        self.assertEqual([], self.verificationErrors) 是个难点,
        对前面verificationErrors方法获得的列表进行比较;如查verificationErrors的列表不为空,输出列表中的报错信息。
        '''
        self.assertEqual([], self.verificationErrors)

class DriversssTest(unittest.TestCase):
    #setUp 用于设置初始化的部分,在测试用例执行前,这个方法中的函数将先被调用 #
    def setUp(self):
        '''
        local: Dubbo('127.0.0.1', 8951)
        '''                      
        self.dubbo_conn = Dubbo('127.0.0.1', 8951)
        self.verificationErrors = []  # 脚本运行时,错误的信息将被打印到这个列表中#
        self.accept_next_alert = True  # 是否继续接受下一个警告#

    def test_old_bind_phone(self):
       data = [201689047,1221,"18058783246","18058755045"]
       result = self.dubbo_conn.invoke(
            "com.xxxxx.driver.sss.api.DriverVirtPhoneApi", "bindPhone", data)
       print(result)


    def queryVirtPhone(self):
        data = [782919,'18790899859']
        result = self.dubbo_conn.invoke(
            "com.xxxxx.driver.sss.api.DriverVirtPhoneApi", "queryVirtPhone", data)
        print(result)


    def tearDown(self):
        '''
        tearDown 方法在每个测试方法执行后调用,这个地方做所有清理工作,如退出浏览器等。
        self.assertEqual([], self.verificationErrors) 是个难点,
        对前面verificationErrors方法获得的列表进行比较;如查verificationErrors的列表不为空,输出列表中的报错信息。
        '''
        self.assertEqual([], self.verificationErrors)

'''
local
'''
class LocalTest(unittest.TestCase):
    #setUp 用于设置初始化的部分,在测试用例执行前,这个方法中的函数将先被调用 #
    def setUp(self):
        '''
        local1: Dubbo('127.0.0.1', 20880)
        '''
        self.dubbo_conn = Dubbo('127.0.0.1', 20880)
        self.verificationErrors = []  # 脚本运行时,错误的信息将被打印到这个列表中#
        self.accept_next_alert = True  # 是否继续接受下一个警告#
    # 测试sayHello
    def test_say_hello(self):
        for x in range(0,1):
            data = {"class":"com.alibaba.dubbo.demo.BeanParam","name":"lios","age":25+x}
            result = self.dubbo_conn.invoke(
            "com.alibaba.dubbo.demo.DemoService", "hello", data)
            print(result)
            pass
        time.sleep(2)

    def test_2(self):
        print(self.dubbo_conn)

    def tearDown(self):
        '''
        tearDown 方法在每个测试方法执行后调用,这个地方做所有清理工作,如退出浏览器等。
        self.assertEqual([], self.verificationErrors) 是个难点,
        对前面verificationErrors方法获得的列表进行比较;如查verificationErrors的列表不为空,输出列表中的报错信息。
        '''
        self.assertEqual([], self.verificationErrors)


'''
qq suite
'''
def qq_suite():
    suite = unittest.TestSuite()
    # suite.addTest(qqTest("test_update_driver_status"))
    # suite.addTest(qqTest("test_start_service"))
    suite.addTest(qqTest("test_orderConfirmV2"))
    
    return suite
    pass

'''
driver-sss suite
'''
def driver_sss_suite():
    suite = unittest.TestSuite()
    # suite.addTest(DriversssTest("test_pre_bind_phone"))
    # suite.addTest(DriversssTest("test_bind_phone"))
    # suite.addTest(DriversssTest("queryVirtPhone"))
    suite.addTest(DriversssTest("test_old_bind_phone"))
    return suite
    pass

'''
local suite
'''
def local_suite():
    suite = unittest.TestSuite()
    suite.addTest(LocalTest("test_say_hello"))
    return suite
    pass

if __name__ == "__main__":
    ## 指定suite套件
    unittest.main(defaultTest='driver_sss_suite')