example07.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. """
  2. example07.py - 将非异步的三方库封装为异步调用
  3. """
  4. import asyncio
  5. import concurrent
  6. import json
  7. import tornado
  8. import tornado.web
  9. import pymysql
  10. from pymysql import connect
  11. from pymysql.cursors import DictCursor
  12. from tornado.ioloop import IOLoop
  13. from tornado.options import define, parse_command_line, options
  14. from tornado.platform.asyncio import AnyThreadEventLoopPolicy
  15. define('port', default=8888, type=int)
  16. def get_mysql_connection():
  17. return connect(
  18. host='120.77.222.217',
  19. port=3306,
  20. db='hrs',
  21. charset='utf8',
  22. use_unicode=True,
  23. user='root',
  24. password='123456',
  25. )
  26. class HomeHandler(tornado.web.RequestHandler):
  27. executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
  28. async def get(self, no):
  29. return await self._get(no)
  30. @tornado.concurrent.run_on_executor
  31. def _get(self, no):
  32. con = get_mysql_connection()
  33. try:
  34. with con.cursor(DictCursor) as cursor:
  35. cursor.execute("select * from tb_dept where dno=%s", (no, ))
  36. if cursor.rowcount == 0:
  37. self.finish(json.dumps({
  38. 'code': 20001,
  39. 'mesg': f'没有编号为{no}的部门'
  40. }))
  41. return
  42. row = cursor.fetchone()
  43. self.finish(json.dumps(row))
  44. finally:
  45. con.close()
  46. async def post(self, *args, **kwargs):
  47. return await self._post(*args, **kwargs)
  48. @tornado.concurrent.run_on_executor
  49. def _post(self, *args, **kwargs):
  50. no = self.get_argument('no')
  51. name = self.get_argument('name')
  52. loc = self.get_argument('loc')
  53. conn = get_mysql_connection()
  54. try:
  55. with conn.cursor() as cursor:
  56. cursor.execute('insert into tb_dept values (%s, %s, %s)',
  57. (no, name, loc))
  58. conn.commit()
  59. except pymysql.MySQLError:
  60. self.finish(json.dumps({
  61. 'code': 20002,
  62. 'mesg': '添加部门失败请确认部门信息'
  63. }))
  64. else:
  65. self.set_status(201)
  66. self.finish()
  67. def main():
  68. asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
  69. parse_command_line()
  70. app = tornado.web.Application(
  71. handlers=[(r'/api/depts/(.*)', HomeHandler), ]
  72. )
  73. app.listen(options.port)
  74. IOLoop.current().start()
  75. if __name__ == '__main__':
  76. main()