1、参数查询示例demo代码
def get_classified_songs(self, telegram_id):
conn = sqlite3.connect(self._DATABASE)
sql = """
SELECT
danceability, energy, loudness, speechiness, acousticness,
instrumentalness, liveness, valence, tempo, activity
FROM songs s, users u, song_user su
WHERE
activity IS NOT NULL AND
s.id = su.song_id AND
su.user_id = u.id AND
u.telegram_user_id = {}
""".format(telegram_id)
resp = pd.read_sql_query(sql, conn)
conn.close()
return resp
源代码:https://github.com/mongonauta/heydjbot/tree/master/demo3/server/database.py
2、创建Dataframe示例demo代码
def build_df(table: str = 'articles',
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> pd.DataFrame:
"""Build dataframe with derived fields."""
with closing(sqlite3.connect(DB_FILE_NAME)) as conn:
articles = pd.read_sql_query(f'select * from {table}', conn)
articles['date'] = pd.to_datetime(articles['publish_date'])
if start_date:
articles = articles.loc[articles['date'] >= start_date]
if end_date:
articles = articles.loc[articles['date'] <= end_date]
articles = articles.replace([None], [''], regex=True)
articles['base_url'] = articles.apply(get_url_base, axis=1)
articles['word_count'] = articles.apply(count_words, axis=1)
return articles
源代码:https://github.com/bmassman/fake_news/blob/master/fake_news/pipeline/build_df.py
3、表连接查询示例demo代码
def SensitivityQuery(self, table, data_set):
#Returns the number of times an analyte is found at each concentration and the
#number of repetitions in a particular data set.
sql_statement = "SELECT COUNT(%s.id) AS Count, %s.Concentration_pg AS Conc_pg, \
DataSetConcentrations.Repetitions AS Repetitions \
FROM \
Sample \
INNER JOIN %s ON \
%s.id = Sample.%s_foreignkey \
INNER JOIN DataSetConcentrations ON \
DataSetConcentrations.id = Sample.DataSetConcentrations_foreignkey \
WHERE \
Sample.DataSetName = '%s' \
GROUP BY \
Conc_pg \
ORDER BY \
Conc_pg;" % (table, table, table, table, table, data_set)
return pd.read_sql_query(sql_statement, self.conn)
4、分组查询示例demo代码
def GetRepsAtEachConcentration(self, analyte_table_lst, data_set):
df = pd.DataFrame()
for table in analyte_table_lst:
sql_statement = "SELECT \
%s.Concentration_pg AS Conc, COUNT(%s.Concentration_pg) AS %s \
FROM \
Sample \
Inner Join %s ON \
%s.id = Sample.%s_foreignkey \
WHERE \
DataSetName = '%s' \
GROUP BY 1 \
ORDER BY 1 ASC;" % (table, table, table, table, table, table, data_set)
df1 = pd.read_sql_query(sql_statement, self.conn)
df1.set_index('Conc', inplace=True)
df = pd.concat([df, df1], axis=1)
return df
5、导出数据示例demo代码
def import_data_from_psql(user_id):
"""Import data from psql; clean & merge dataframes."""
library = pd.read_sql_table(
'library',
con='postgres:///nextbook',
columns=['book_id', 'title', 'author', 'pub_year', 'original_pub_year', 'pages'])
book_subjects = pd.read_sql_table(
'book_subjects',
con='postgres:///nextbook')
subjects = pd.read_sql_table(
'subjects', con='postgres:///nextbook',
columns=['subject_id', 'subject'])
user_ratings = pd.read_sql_query(
sql=('SELECT book_id, user_id, status, rating FROM user_books WHERE user_id=%s' % user_id),
con='postgres:///nextbook')
library = library.merge(user_ratings, how='left', on='book_id')
library['pages'].fillna(0, inplace=True)
#merge subject names into book_subjects; drop uninteresting subjects from book_subjects table
book_subjects = book_subjects.merge(subjects, how='left', on='subject_id')
delete_values = ["protected daisy", "accessible book", "in library", "overdrive", "large type books", 'ficci\xc3\xb3n juvenil', 'ficci\xc3\xb3n', 'lending library']
book_subjects = book_subjects[~book_subjects['subject'].isin(delete_values)]
return [library, book_subjects, subjects]
源代码:https://github.com/EmmaOnThursday/next-book/blob/master/app/recommendation_creation.py
6、数据库连接示例demo代码
def test_sql_open_close(self):
#Test if the IO in the database still work if the connection closed
#between the writing and reading (as in many real situations).
with tm.ensure_clean() as name:
conn = self.connect(name)
sql.to_sql(self.test_frame3, "test_frame3_legacy", conn,
flavor="sqlite", index=False)
conn.close()
conn = self.connect(name)
result = sql.read_sql_query("SELECT * FROM test_frame3_legacy;",
conn)
conn.close()
tm.assert_frame_equal(self.test_frame3, result)
7、有关时间类型的示例demo代码
def test_datetime(self):
df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),
'B': np.arange(3.0)})
df.to_sql('test_datetime', self.conn)
# with read_table -> type information from schema used
result = sql.read_sql_table('test_datetime', self.conn)
result = result.drop('index', axis=1)
tm.assert_frame_equal(result, df)
# with read_sql -> no type information -> sqlite has no native
result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn)
result = result.drop('index', axis=1)
if self.flavor == 'sqlite':
self.assertTrue(isinstance(result.loc[0, 'A'], string_types))
result['A'] = to_datetime(result['A'])
tm.assert_frame_equal(result, df)
else:
tm.assert_frame_equal(result, df)
8、时间类型处理的示例demo代码
def test_datetime_NaT(self):
df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),
'B': np.arange(3.0)})
df.loc[1, 'A'] = np.nan
df.to_sql('test_datetime', self.conn, index=False)
# with read_table -> type information from schema used
result = sql.read_sql_table('test_datetime', self.conn)
tm.assert_frame_equal(result, df)
# with read_sql -> no type information -> sqlite has no native
result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn)
if self.flavor == 'sqlite':
self.assertTrue(isinstance(result.loc[0, 'A'], string_types))
result['A'] = to_datetime(result['A'], errors='coerce')
tm.assert_frame_equal(result, df)
else:
tm.assert_frame_equal(result, df)