transfer2pg.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # transfer2pg.py
  4. # Copyright (C) 2019-2021 github.com/googlehosts Group:Z
  5. #
  6. # This module is part of googlehosts/telegram-repeater and is released under
  7. # the AGPL v3 License: https://www.gnu.org/licenses/agpl-3.0.txt
  8. #
  9. # This program is free software: you can redistribute it and/or modify
  10. # it under the terms of the GNU Affero General Public License as published by
  11. # the Free Software Foundation, either version 3 of the License, or
  12. # any later version.
  13. #
  14. # This program is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. # GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  21. import asyncio
  22. from configparser import ConfigParser
  23. from datetime import datetime
  24. from typing import Any, Callable, Tuple, Union
  25. import aiomysql
  26. import asyncpg
  27. config = ConfigParser()
  28. config.read('config.ini')
  29. host = config.get('database', 'host')
  30. port = config.get('pgsql', 'port') # only for pgsql
  31. muser = config.get('database', 'user')
  32. mpasswd = config.get('database', 'passwd')
  33. puser = config.get('pgsql', 'user')
  34. ppasswd = config.get('pgsql', 'passwd')
  35. mdatabase = config.get('database', 'db_name')
  36. pdatabase = config.get('pgsql', 'database')
  37. async def main() -> None:
  38. pgsql_connection = await asyncpg.connect(host=host, port=port, user=puser, password=ppasswd, database=pdatabase)
  39. mysql_connection = await aiomysql.create_pool(
  40. host=host,
  41. user=muser,
  42. password=mpasswd,
  43. db=mdatabase,
  44. charset='utf8mb4',
  45. cursorclass=aiomysql.cursors.Cursor,
  46. )
  47. if input('Do you want to delete all data? [y/N]: ').strip().lower() == 'y':
  48. await clean(pgsql_connection)
  49. print('Clear database successfully')
  50. else:
  51. print('Skipped clear database')
  52. async with mysql_connection.acquire() as conn:
  53. async with conn.cursor() as cursor:
  54. await exec_and_insert(cursor, "SELECT * FROM answer_history", pgsql_connection,
  55. '''INSERT INTO "answer_history" VALUES ($1, $2, $3, $4)''')
  56. await exec_and_insert(cursor, "SELECT * FROM auth_user", pgsql_connection,
  57. '''INSERT INTO "auth_user" VALUES ($1, $2, $3, $4)''', transfer_stage_1)
  58. await exec_and_insert(cursor, "SELECT * FROM banlist", pgsql_connection,
  59. '''INSERT INTO "banlist" VALUES ($1)''')
  60. await exec_and_insert(cursor, "SELECT * FROM exam_user_session", pgsql_connection,
  61. '''INSERT INTO "exam_user_session" VALUES ($1, 1, $2, $3, $4, $5, $6, $7, $8)''',
  62. transfer_stage_2)
  63. await exec_and_insert(cursor, "SELECT * FROM msg_id", pgsql_connection,
  64. '''INSERT INTO "msg_id" VALUES ($1, $2, $3, $4)''')
  65. await exec_and_insert(cursor, "SELECT * FROM reasons", pgsql_connection,
  66. '''INSERT INTO reasons VALUES ($1, $2, $3, $4, $5)''')
  67. await exec_and_insert(cursor, "SELECT * FROM tickets", pgsql_connection,
  68. '''INSERT INTO tickets VALUES ($1, $2, $3, $4, $5, $6, $7)''')
  69. await exec_and_insert(cursor, "SELECT * FROM tickets_user", pgsql_connection,
  70. '''INSERT INTO tickets_user VALUES ($1, $2, $3, $4, $5, $6, $7)''',
  71. transfer_stage_3)
  72. await exec_and_insert(cursor, "SELECT * FROM username", pgsql_connection,
  73. '''INSERT INTO username VALUES ($1, $2, $3)''')
  74. await pgsql_connection.close()
  75. mysql_connection.close()
  76. await mysql_connection.wait_closed()
  77. def transfer_stage_1(obj: Tuple[int, str, str, str]) -> Tuple[Union[bool, Any], ...]:
  78. def str2bool(x: str) -> bool:
  79. return True if x == 'Y' else False
  80. return tuple(map(lambda x: str2bool(x) if isinstance(x, str) else x, obj))
  81. def transfer_stage_2(obj: Tuple[int, int, datetime, int, int, int, int, int]
  82. ) -> Tuple[int, int, datetime, bool, bool, bool, bool, int]:
  83. return tuple((*obj[:3], *(bool(obj[i]) for i in range(3, 7)), obj[7], ))
  84. def transfer_stage_3(obj: Tuple[int, datetime, datetime, int, datetime, int, str]
  85. ) -> Tuple[int, datetime, datetime, bool, datetime, int, str]:
  86. return tuple((*obj[:3], bool(obj[3]), *obj[4:]))
  87. async def exec_and_insert(cursor, sql: str, pg_connection, insert_sql: str,
  88. process: Callable[[Any], Any] = None) -> None:
  89. print('Processing table:', sql[13:])
  90. if await pg_connection.fetchrow(f'{sql} LIMIT 1') is not None:
  91. if input(f'Table {sql[13:]} has data, do you still want to process insert? [y/N]: ').strip().lower() != 'y':
  92. return
  93. await cursor.execute(sql)
  94. for sql_obj in await cursor.fetchall():
  95. if process is not None:
  96. sql_obj = process(sql_obj)
  97. # print(type(sql_obj), len(sql_obj), *sql_obj)
  98. await pg_connection.execute(insert_sql, *sql_obj)
  99. return
  100. await cursor.execute('''SELECT `AUTO_INCREMENT`
  101. FROM INFORMATION_SCHEMA.TABLES
  102. WHERE TABLE_SCHEMA = %s
  103. AND TABLE_NAME = 'TableName';''', mdatabase)
  104. obj = await cursor.fetchall()
  105. if len(obj):
  106. await pg_connection.execute('''ALTER SEQUENCE product_id_seq RESTART WITH $1''')
  107. async def clean(pgsql_connection: asyncpg.connection) -> None:
  108. await pgsql_connection.execute('''TRUNCATE "answer_history"''')
  109. await pgsql_connection.execute('''TRUNCATE "auth_user"''')
  110. await pgsql_connection.execute('''TRUNCATE "banlist"''')
  111. await pgsql_connection.execute('''TRUNCATE "exam_user_session"''')
  112. await pgsql_connection.execute('''TRUNCATE "msg_id"''')
  113. await pgsql_connection.execute('''TRUNCATE "reasons"''')
  114. await pgsql_connection.execute('''TRUNCATE "tickets"''')
  115. await pgsql_connection.execute('''TRUNCATE "tickets_user"''')
  116. await pgsql_connection.execute('''TRUNCATE "username"''')
  117. if __name__ == '__main__':
  118. asyncio.get_event_loop().run_until_complete(main())