python 阿里云oss实现直传签名与回调验证的示例方法

作者:weixin_54126636 时间:2021-12-08 00:30:18 

签名


import base64
import json
import time
from datetime import datetime
import hmac
from hashlib import sha1

access_key_id = ''
# 请填写您的AccessKeySecret。
access_key_secret = ''
# host的格式为 bucketname.endpoint ,请替换为您的真实信息。
host = ''
# callback_url为 上传回调服务器的URL,请将下面的IP和Port配置为您自己的真实信息。
callback_url = ""
# 用户上传文件时指定的前缀。
upload_dir = 'user-dir-prefix/'
expire_time = 1200
expire_syncpoint = int(time.time() + expire_time)

policy_dict = {
 'expiration': datetime.utcfromtimestamp(expire_syncpoint).isoformat() + 'Z',
 'conditions': [
   {"bucket": "test-paige"},
   ['starts-with', '$key', 'user/test/']
 ]
}
policy = json.dumps(policy_dict).strip()
policy_encode = base64.b64encode(policy.encode())
signature = base64.encodebytes(hmac.new(access_key_secret.encode(), policy_encode, sha1).digest())

callback_dict = {
 'callbackUrl': callback_url,
 'callbackBody': 'filename=${object}&size=${size}&mimeType=${mimeType}&height=${imageInfo.height}&width=${'
         'imageInfo.width}',
 'callbackBodyType': 'application/json'
}

callback = base64.b64encode(json.dumps(callback_dict).strip().encode()).decode()

var = {
 'accessid': access_key_id,
 'host': host,
 'policy': policy_encode.decode(),
 'signature': signature.decode().strip(),
 'expire': expire_syncpoint,
 'callback': callback
}

回调验签


import asyncio
import base64
import time
import aiomysql
import rsa
from aiohttp import web, ClientSession
from urllib import parse
import uuid

def success(msg='', data=None):
 if data is None:
   data = {}
 dict_data = {
   'code': 1,
   'msg': msg,
   'data': data
 }
 return web.json_response(dict_data)

def failed(msg='', data=None):
 if data is None:
   data = {}
 dict_data = {
   'code': 0,
   'msg': msg,
   'data': data
 }
 return web.json_response(dict_data)

async def handle(request):
 """
 获取连接池
 :param web.BaseRequest request:
 :return:
 """
 authorization_base64 = request.headers['authorization']
 x_oss_pub_key_url_base64 = request.headers['x-oss-pub-key-url']
 pub_key_url = base64.b64decode(x_oss_pub_key_url_base64.encode())
 authorization = base64.b64decode(authorization_base64.encode())
 path = request.path

async with ClientSession() as session:
   async with session.get(pub_key_url.decode()) as resp:
     pub_key_body = await resp.text()
     pubkey = rsa.PublicKey.load_pkcs1_openssl_pem(pub_key_body.encode())
     body = await request.content.read()
     auth_str = parse.unquote(path) + '\n' + body.decode()
     parse_url = parse.parse_qs(body.decode())
     print(parse_url)
     try:
       rsa.verify(auth_str.encode(), authorization, pubkey)
       pool = request.app['mysql_pool']
       async with pool.acquire() as conn:
         async with conn.cursor() as cur:
           id = str(uuid.uuid4())
           url = parse_url['filename'][0]
           mime = parse_url['mimeType'][0]
           disk = 'oss'
           time_at = time.strftime("%Y-%m-%d %H:%I:%S", time.localtime())
           sql = "INSERT INTO media(id,url,mime,disk,created_at,updated_at) VALUES(%s,%s,%s,%s,%s,%s)"
           await cur.execute(sql, (id, url, mime, disk, time_at, time_at))
           await conn.commit()
       dict_data = {
         'id': id,
         'url': url,
         'cdn_url': 'https://cdn.***.net' + '/' + url,
         'mime': mime,
         'disk': disk,
         'created_at': time_at,
         'updated_at': time_at,
       }
       return success(data=dict_data)
     except rsa.pkcs1.VerificationError:
       return failed(msg='验证错误')

async def init(loop):
 # 创建连接池
 mysql_pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                     user='', password='',
                     db='', loop=loop)

async def on_shutdown(application):
   """
   接收到关闭信号时,要先关闭连接池,并等待连接池关闭成功.
   :param web.Application application:
   :return:
   """
   application['mysql_pool'].close()
   # 没有下面这句话会报错 RuntimeError: Event loop is closed ,因为连接池没有真正关关闭程序就关闭了,引发python的报错
   await application['mysql_pool'].wait_closed()

application = web.Application()
 application.on_shutdown.append(on_shutdown)
 # 把连接池放到 application 实例中
 application['mysql_pool'] = mysql_pool
 application.add_routes([web.get('/', handle), web.post('/oss', handle)])
 return application

if __name__ == '__main__':
 loop = asyncio.get_event_loop()
 application = loop.run_until_complete(init(loop))
 web.run_app(application, host='127.0.0.1')
 loop.close()

来源:https://blog.csdn.net/weixin_54126636/article/details/113731760

标签:python,直传签名,回调验证
0
投稿

猜你喜欢

  • python+opencv实现的简单人脸识别代码示例

    2021-06-15 01:02:27
  • python中的selenium实现自动向下滚动页面并指定最大滑动距离

    2021-09-04 20:44:03
  • 使用Python遍历文件夹实现查找指定文件夹

    2021-01-19 09:23:06
  • 使用Pycharm创建一个Django项目的超详细图文教程

    2021-07-13 10:47:17
  • 如何用Python合并lmdb文件

    2023-08-05 17:42:01
  • Python实现照片卡通化

    2021-03-29 18:45:40
  • python爬取m3u8连接的视频

    2023-06-18 13:40:21
  • Django项目在pycharm新建的步骤方法

    2021-01-19 11:46:32
  • MySQL临时表的使用方法详解

    2024-01-23 15:13:51
  • 解决Django cors跨域问题

    2021-07-02 08:42:36
  • python基于queue和threading实现多线程下载实例

    2023-02-04 09:58:22
  • python 显示数组全部元素的方法

    2021-03-20 13:15:01
  • 关于networkx返回图的邻接矩阵问题

    2021-04-09 13:10:35
  • Python函数装饰器的使用详解

    2023-04-30 14:47:24
  • python列表和字符串的三种逆序遍历操作

    2022-09-09 03:00:56
  • Python 函数那不为人知的一面

    2022-09-24 10:03:31
  • JS数组array元素的添加和删除方法代码实例

    2024-04-23 09:25:11
  • 用python给csv里的数据排序的具体代码

    2021-02-15 01:12:25
  • go语言获取系统盘符的方法

    2024-05-22 10:19:47
  • python抓取网站的图片并下载到本地的方法

    2022-05-19 23:45:08
  • asp之家 网络编程 m.aspxhome.com