20.multi_case07

时间:2022-07-25
本文章向大家介绍20.multi_case07,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
# coding:utf-8

import re
import ssl
import csv
import json
import time
import random
import asyncio
import aiohttp
import requests
from lxml import etree
from asyncio.queues import Queue
from aiosocksy import Socks5Auth
from aiosocksy.connector import ProxyConnector, ProxyClientRequest


class Common():
    task_queue = Queue()
    result_queue = Queue()
    market_cap_all = 0
    currency_rate = 0


# 线上内网
socks5_address_prod = [
    'socks5://10.1.100.253:1235',
    'socks5://10.1.100.51:1235',
    'socks5://10.1.100.70:1235',
    'socks5://10.1.100.205:1235',
    'socks5://10.1.100.73:1235'
]

# 办公网
socks5_address_dev = [
    'socks5://18.208.81.123:1235',
    'socks5://34.197.217.25:1235',
    'socks5://52.20.255.43:1235',
    'socks5://34.237.163.87:1235',
    'socks5://18.208.81.123:1235',
    'socks5://52.0.114.155:1235'
]

DEPLOY_MODE = "dev"


async def session_get(session, url, socks):
    auth = Socks5Auth(login='...', password='...')
    headers = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
    timeout = aiohttp.ClientTimeout(total=20)
    response = await session.get(
        url,
        proxy=socks,
        proxy_auth=auth,
        timeout=timeout,
        headers=headers,
        ssl=ssl.SSLContext()
    )
    return await response.text(), response.status


async def download(url):
    connector = ProxyConnector()
    if DEPLOY_MODE == "dev":
        socks = None
    elif DEPLOY_MODE == "Prod":
        socks = random.choice(socks5_address_prod)
    async with aiohttp.ClientSession(
            connector=connector,
            request_class=ProxyClientRequest
    ) as session:
        ret, status = await session_get(session, url, socks)
        if 'window.kk' in ret and len(ret) < 1000:
            url = ret.split("window.kk='")[1].split("'")[0]
            ret, status = await session_get(session, url, socks)
        return ret, status


async def parse_html(cid, url, response):
    coin_info = {}
    coin_value = {}

    coin_info['url'] = url
    coin_info['cid'] = cid
    coin_info['time'] = int(time.time())

    tree = etree.HTML(response)

    try:
        price_usd = tree.xpath(
            '//div[@class="priceInfo"]/div[@class="sub"]/span[1]/text()'
        )[0].strip().replace('$', '')
        if '?' not in price_usd:
            coin_value['price'] = float(price_usd)
    except BaseException:
        pass

    try:
        updown = tree.xpath(
            '//div[@class="priceInfo"]/div[@class="sub smallfont"]/span[1]/text()'
        )[0].strip().replace('%', '')
        coin_value['updown'] = float(updown)
    except BaseException:
        pass

    try:
        volume_24h_rmb = tree.xpath(
            '//div[@class="info"]/div[@class="charCell"][2]/div[2]/span/text()'
        )[0].strip().replace('¥', '').replace(',', '')
        coin_value['volume_24h'] = int(
            float(volume_24h_rmb) / Common.currency_rate)
    except BaseException:
        pass

    try:
        circulating_supply = tree.xpath(
            '//div[@class="info"]//div[@class="charCell"][1]/div[@class="val"]/text()'
        )[0].strip().replace(',', '')
        if '?' not in circulating_supply:
            circulating_supply = re.match(
                r'^(d+)(w+)$', circulating_supply).group(1)
            coin_value['circulating_supply'] = int(circulating_supply)
    except BaseException:
        pass

    try:
        if coin_value['price'] and coin_value['circulating_supply']:
            market_cap = coin_value['price'] * coin_value['circulating_supply']
            coin_value['market_cap'] = market_cap
    except BaseException:
        pass

    try:
        if coin_value['market_cap']:
            global_share = coin_value['market_cap'] / Common.market_cap_all
            if global_share < 0.001:
                coin_value['global_share'] = '<0.1%'
            else:
                coin_value['global_share'] = str(
                    (global_share * 100).__round__(2)) + '%'
    except BaseException:
        pass

    try:
        circulation_rate = tree.xpath(
            '//div[@class="info"]//div[@class="charbox"][1]/div[@class="val"]/text()'
        )[0].strip()
        if '?' not in circulation_rate:
            coin_value['circulation_rate'] = circulation_rate
    except BaseException:
        pass

    try:
        turnover_rate = tree.xpath(
            '//div[@class="info"]//div[@class="charbox"][1]/div[@class="val"]/text()'
        )[1].strip()
        if '?' not in turnover_rate:
            coin_value['turnover_rate'] = turnover_rate
    except BaseException:
        pass

    try:
        issue_time = tree.xpath(
            '//div[@class="infoList"]/div[1]/div[1]/span[2]/text()'
        )[0].strip()
        if issue_time != '-':
            coin_value['issue_time'] = issue_time
    except BaseException:
        pass

    try:
        exchange_num = tree.xpath(
            '//div[@class="infoList"]/div[3]/div[1]/span[2]/text()'
        )[0].strip().replace('家', '')
        coin_value['exchange_num'] = int(exchange_num)
    except BaseException:
        pass

    try:
        total_circulation = tree.xpath(
            '//div[@class="infoList"]/div[2]/div[2]/span[2]/text()'
        )[0].strip().replace(',', '')
        coin_value['total_circulation'] = int(total_circulation)
    except BaseException:
        pass

    coin_info['value'] = coin_value
    return coin_info


async def down_and_parse_task(queue):
    while True:
        try:
            cid, url = queue.get_nowait()[:2]
        except BaseException:
            return
        for retry_cnt in range(3):
            try:
                html, status = await download(url)
                if status != 200:
                    html, status = await download(url)
                if '访问控制拒绝了你的请求' in html:
                    html, status = await download(url)
                html_parse_result = await parse_html(cid, url, html)
                print(html_parse_result)
                await Common.result_queue.put(html_parse_result)
                break
            except BaseException:
                await asyncio.sleep(0.2)
                continue


async def push(data):
    url = 'http://127.0.0.1:8000/aaa'
    error = None
    for retry_cnt in range(3):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                        url,
                        data=json.dumps(data)
                ) as response:
                    pass
                response.raise_for_status()
        except Exception as e:
            await asyncio.sleep(0.2)
            print(e)


async def speed_monitor():
    while Common.task_queue.qsize() != 0:
        old_queue_len = Common.task_queue.qsize()
        await asyncio.sleep(5)
        new_queue_count = Common.task_queue.qsize()
        print('=================')
        print('speed = ', (old_queue_len - new_queue_count) / 5)


async def monitor_finish():
    while len(asyncio.Task.all_tasks()) > 3:
        await asyncio.sleep(1)
    await asyncio.sleep(5)
    raise SystemExit()


async def push_results():
    temp_q = []
    while True:
        try:
            await asyncio.sleep(3)
            for _ in range(Common.result_queue.qsize()):
                temp_q.append(await Common.result_queue.get())
            if len(temp_q) > 0:
                await push(temp_q)
                temp_q.clear()
        except BaseException:
            import traceback
            print(traceback.format_exc())


async def get_marketcap():
    url = 'https://dncapi.feixiaohao.com/api/home/global?webp=0'
    response = requests.get(url)
    response_json = json.loads(response.text)
    marketcap = response_json['data']['marketcapvol']
    Common.market_cap_all = int(marketcap)


async def get_currency_rate():
    url_rate = 'https://dncapi.feixiaohao.com/api/coin/web-rate/'
    response = requests.get(url_rate)
    currency_rate = json.loads(response.text)[11]['cny']
    Common.currency_rate = currency_rate

# 300秒抓取时间上限
async def time_limit():
    await asyncio.sleep(280)
    raise SystemExit()


async def main():
    # loop = asyncio.get_event_loop()
    csv_reader = csv.reader(
        open(
            'feixiaohao_mapping_data.csv',
            encoding='utf-8'))
    for row in csv_reader:
        try:
            if row[1].startswith('https'):
                await Common.task_queue.put(row)
        except BaseException:
            pass
    print(Common.task_queue)

    await get_marketcap()
    print('总市值', Common.market_cap_all)

    await get_currency_rate()
    print('汇率', Common.currency_rate)

    for _ in range(10):
        loop.create_task(down_and_parse_task(Common.task_queue))
        loop.create_task(monitor_finish())
        loop.create_task(speed_monitor())
        loop.create_task(push_results())
        loop.create_task(time_limit())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.create_task(main())
    loop.run_forever()