radius挑战认证+对接华为防火墙实现SSLvpn +钉钉 双因子认证2

时间:2020-03-26
本文章向大家介绍radius挑战认证+对接华为防火墙实现SSLvpn +钉钉 双因子认证2,主要包括radius挑战认证+对接华为防火墙实现SSLvpn +钉钉 双因子认证2使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

上文,我们简单实现了 使用pyrad 搭建radius对接防火墙挑战认证 但是挑战码只在控制台输出就有点不太合适,我们接下来使用python对接钉钉,及时的把动态码发给钉钉使用

不说废话 上代码

# -*- coding: UTF-8 -*-
from pyrad import *
import socket
import pyrad.host
import random
import pymysql
from dingtalk import *
import requests
import json
BUFSIZE = 1024
KEY = b"testing123456789"
CHALLENGE = "666" #挑战码初始为666
appkey='xxx'#钉钉的appkey
appsecret='xxxx' #钉钉的appsecret 在钉钉的管理后台这两个参数都能找到
class RadiusServer(pyrad.host.Host):
    def __init__(self):
        dict = pyrad.dictionary.Dictionary("dictionary") #从通用的字典使用
        pyrad.host.Host.__init__(self, dict=dict)
    def get_challenge(self): #产生一个4字节的随机挑战码
        challenge = ""  #初始化挑战码
        challenge = challenge + str(chr(random.randint(65,90)))
        challenge = challenge + str(chr(random.randint(65,90)))
        challenge = challenge + str(chr(random.randint(65,90)))
        challenge = challenge + str(chr(random.randint(65,90)))
        return challenge
    def get_token(self):
        url = 'https://oapi.dingtalk.com/gettoken?appkey='+appkey+'&appsecret='+appsecret
        response = requests.get(url=url)
        result = response.json()
        errmsg = result['errmsg']
        print('获取token是否成功:',errmsg)
        try:
            access_token = result['access_token']
        except Exception as e:
            print(e)
            access_token = ''
        return access_token                 
    def check_pass(self, radpkt): 
        global CHALLENGE
        global access_token1
        conn=pymysql.connect(
            host='xxxx',
            port=3306,   
            user='root',
            password='xxxx',
            db='xxxx',#这都是数据库的参数 按照你实际的情况来
            charset='utf8'
        )
         # 拿到游标
        cursor=conn.cursor()
        user=radpkt["User-Name"][0]
        print(user)
        password=radpkt["User-Password"][0]
        print(password)
         # 执行sql语句 
        sql='select * from test where user = "%s" '% (user)
        res=cursor.execute(sql)
        pwd=cursor.fetchall()
        cursor.close()
        conn.close()     
        if radpkt.PwCrypt(CHALLENGE) == radpkt["User-Password"][0]:
            radpkt.code = packet.AccessAccept
            CHALLENGE = self.get_challenge() #挑战码使用过后就更换掉
            radpkt.AddAttribute("Reply-Message","PASSCODE Accept")
            print("AccessAccept")                
        elif radpkt.PwCrypt(pwd[0][1]) == radpkt["User-Password"][0]:
            radpkt.code = packet.AccessChallenge
            CHALLENGE = self.get_challenge()
            radpkt.AddAttribute("Reply-Message","Enter Token Code") 
            radpkt.AddAttribute("State",b'0x1122123231') #最好这个还是随机生成,好吧我有点偷懒
            #下面发送验证码
            access_token1 = self.get_token()
            userid=pwd[0][2]
            print('钉钉id为:',pwd[0][2])
            url = 'https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2?access_token='+access_token1
            content = '你好,VPN验证码为'+CHALLENGE
            _neirong = {
                     'agent_id':'xxxx',#应用id钉钉后台查看
                     'userid_list':userid,
                     'msg':{
                            'msgtype':'text',
                            'text':{'content':content}}
                        }
            neirong = json.dumps(_neirong)
            response = requests.post(url=url,data=neirong)
            result = response.json()
            print(result)            
        else:
            radpkt.code = packet.AccessReject
            print("AccessReject")
    def get_pkt(self, pkt):
        get_pw = None
        get_name = None
        radpkt = self.CreateAuthPacket(packet=pkt) #解析请求报文
        #radpkt.code = packet.AccessChallenge
        radpkt.secret = KEY
        print("输出相关参数")
        for key in radpkt.keys():        
            print(key, radpkt[key])
            if key == "User-Password":
                get_pw = 1
            if key == "User-Name":
                get_name = 1       
        if 1 == get_pw and 1 == get_name:
            self.check_pass(radpkt)        
        return radpkt.ReplyPacket()         
ip_port = ('', 1812)
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # udp协议
server.bind(ip_port)


while True:
    data,client_addr = server.recvfrom(BUFSIZE)
    srv = RadiusServer()
    reply = srv.get_pkt(data)
    server.sendto(reply, client_addr)    

不得不说一下 钉钉官方的sdk还是python 2 有点小坑  。但是手册写的很清楚,参考手册还是能搞明白的。

原文地址:https://www.cnblogs.com/xihuananjing/p/12573128.html