import requests
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
from urllib.parse import quote_plus as urlquote
from sqlalchemy import Table
#数据库配置
class BaseConfig:
DRIVER = 'mysql'
USER = 'root'
PASSWORD = 'Admin@123'
SERVER = '127.0.0.1'
PORT = '3306'
DATABASE = 'USER_PROFILE'
SQLALCHEMY_DATABASE_URI = '{}+pymysql://{}:{}@{}:{}/{}?charset=utf8' \
.format(DRIVER, USER, urlquote(PASSWORD), SERVER, PORT, DATABASE)
class InsertData:
#需要同步的表
table_list = {
'01': 'MOBILE_USER_PROFILE',
'02': 'INTERNET_USER_PROFILE',
'03': 'TERMINAL_BRAND_PROFILE',
'04': 'ACTIVITE_INTERNET_PROFILE',
'05': 'ACTIVITE_PHONE_PROFILE',
'06': 'FREQUENCY_PHONE_PROFILE',
'07': 'FAMILY_USER_PROFILE',
'08': 'JOB_USER_PROFILE',
'09': 'DETAIL_USER_PROFILE',
'10': 'CAR_USER_PROFILE',
'11': 'SHORT_VIDEO_PROFILE',
'12': 'LONG_VIDEO_PROFILE',
'13': 'LIVE_VIDEO_PROFILE',
'14': 'LOCATION_USER_PROFILE',
'15': 'COMMUTE_USER_PROFILE',
'16': 'FINANCE_USER_PROFILE',
'17': 'HOUSING_USER_PROFILE',
'18': 'BUSINESS_TRIP_PROFILE',
'19': 'VIDEO_CONTENT_PROFILE',
'20': 'FOOD_APP_PROFILE',
'21': 'GRAVIDA_USER_PROFILE',
'22': 'MUSIC_USER_PROFILE',
'23': 'SPORT_USER_PROFILE',
'24': 'MOVIE_USER_PROFILE',
'25': 'NEWS_USER_PROFILE',
'26': 'FORUM_USER_PROFILE'
}
#product_id接口ID,mobile接口参数,main_url接口URL。格式为:http://main_url/product_id/?mobile=xxx
def __init__(self, product_id, mobile, main_url):
self.product_id = product_id
self.mobile = mobile
self.main_url = main_url
#连接数据库
def connect_database(self):
engine = create_engine(BaseConfig.SQLALCHEMY_DATABASE_URI, echo=False)
metadata = MetaData(engine)
connector = engine.connect()
session = sessionmaker(bind=engine)()
return connector, metadata, session
#读取数据。这里先判断手机号是否存在
def get_data(self):
table_name = self.table_list.get(self.product_id)
data_dict = {}
if table_name is None:
print('系统错误,请核查product_id')
else:
#传参数读取json数据
url = self.main_url + '{}'.format(self.product_id)
params = {'mobile': self.mobile}
response = requests.get(url, params)
#如果查到,返回200.查不到则返回404。
if response.status_code == 200:
data_dict = response.json()
return data_dict
#追加数据之前先检查
def check_data(self):
data = self.get_data()
mobile_number = data.get('MOBILE_NUMBER')
table_name = self.table_list.get(self.product_id)
if not mobile_number:
print('没有查到相关信息')
else:
#ORM只需要传入键值对(比如,字典)即可。省去了拼接SQL代码的过程。
cursor, metadata, session = self.connect_database()
orm_table = Table(table_name, metadata, autoload=True)
count = session.query(orm_table).filter_by(MOBILE_NUMBER=mobile_number).first()
if count is None:
#为避免重复,如果有数据则不再追加。
data_dict = self.get_data()
insertor = orm_table.insert()
cursor.execute(insertor, data_dict)
print('数据已插入')
else:
print('数据已存在')
#测试
mobile = 'C17C402C57E170209975AA3DDE64368C'
product_id = '11'
main_url = 'http://127.0.0.1:5000/products/'
x = InsertData(product_id, mobile, main_url)
x.check_data()