1、参数查询示例demo代码
def get_classified_songs(self, telegram_id): conn = sqlite3.connect(self._DATABASE) sql = """ SELECT danceability, energy, loudness, speechiness, acousticness, instrumentalness, liveness, valence, tempo, activity FROM songs s, users u, song_user su WHERE activity IS NOT NULL AND s.id = su.song_id AND su.user_id = u.id AND u.telegram_user_id = {} """.format(telegram_id) resp = pd.read_sql_query(sql, conn) conn.close() return resp
源代码:https://github.com/mongonauta/heydjbot/tree/master/demo3/server/database.py
2、创建Dataframe示例demo代码
def build_df(table: str = 'articles',
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> pd.DataFrame:
"""Build dataframe with derived fields."""
with closing(sqlite3.connect(DB_FILE_NAME)) as conn:
articles = pd.read_sql_query(f'select * from {table}', conn)
articles['date'] = pd.to_datetime(articles['publish_date'])
if start_date:
articles = articles.loc[articles['date'] >= start_date]
if end_date:
articles = articles.loc[articles['date'] <= end_date]
articles = articles.replace([None], [''], regex=True)
articles['base_url'] = articles.apply(get_url_base, axis=1)
articles['word_count'] = articles.apply(count_words, axis=1)
return articles
源代码:https://github.com/bmassman/fake_news/blob/master/fake_news/pipeline/build_df.py
3、表连接查询示例demo代码
def SensitivityQuery(self, table, data_set): #Returns the number of times an analyte is found at each concentration and the #number of repetitions in a particular data set. sql_statement = "SELECT COUNT(%s.id) AS Count, %s.Concentration_pg AS Conc_pg, \ DataSetConcentrations.Repetitions AS Repetitions \ FROM \ Sample \ INNER JOIN %s ON \ %s.id = Sample.%s_foreignkey \ INNER JOIN DataSetConcentrations ON \ DataSetConcentrations.id = Sample.DataSetConcentrations_foreignkey \ WHERE \ Sample.DataSetName = '%s' \ GROUP BY \ Conc_pg \ ORDER BY \ Conc_pg;" % (table, table, table, table, table, data_set) return pd.read_sql_query(sql_statement, self.conn)
4、分组查询示例demo代码
def GetRepsAtEachConcentration(self, analyte_table_lst, data_set): df = pd.DataFrame() for table in analyte_table_lst: sql_statement = "SELECT \ %s.Concentration_pg AS Conc, COUNT(%s.Concentration_pg) AS %s \ FROM \ Sample \ Inner Join %s ON \ %s.id = Sample.%s_foreignkey \ WHERE \ DataSetName = '%s' \ GROUP BY 1 \ ORDER BY 1 ASC;" % (table, table, table, table, table, table, data_set) df1 = pd.read_sql_query(sql_statement, self.conn) df1.set_index('Conc', inplace=True) df = pd.concat([df, df1], axis=1) return df
5、导出数据示例demo代码
def import_data_from_psql(user_id): """Import data from psql; clean & merge dataframes.""" library = pd.read_sql_table( 'library', con='postgres:///nextbook', columns=['book_id', 'title', 'author', 'pub_year', 'original_pub_year', 'pages']) book_subjects = pd.read_sql_table( 'book_subjects', con='postgres:///nextbook') subjects = pd.read_sql_table( 'subjects', con='postgres:///nextbook', columns=['subject_id', 'subject']) user_ratings = pd.read_sql_query( sql=('SELECT book_id, user_id, status, rating FROM user_books WHERE user_id=%s' % user_id), con='postgres:///nextbook') library = library.merge(user_ratings, how='left', on='book_id') library['pages'].fillna(0, inplace=True) #merge subject names into book_subjects; drop uninteresting subjects from book_subjects table book_subjects = book_subjects.merge(subjects, how='left', on='subject_id') delete_values = ["protected daisy", "accessible book", "in library", "overdrive", "large type books", 'ficci\xc3\xb3n juvenil', 'ficci\xc3\xb3n', 'lending library'] book_subjects = book_subjects[~book_subjects['subject'].isin(delete_values)] return [library, book_subjects, subjects]
源代码:https://github.com/EmmaOnThursday/next-book/blob/master/app/recommendation_creation.py
6、数据库连接示例demo代码
def test_sql_open_close(self): #Test if the IO in the database still work if the connection closed #between the writing and reading (as in many real situations). with tm.ensure_clean() as name: conn = self.connect(name) sql.to_sql(self.test_frame3, "test_frame3_legacy", conn, flavor="sqlite", index=False) conn.close() conn = self.connect(name) result = sql.read_sql_query("SELECT * FROM test_frame3_legacy;", conn) conn.close() tm.assert_frame_equal(self.test_frame3, result)
7、有关时间类型的示例demo代码
def test_datetime(self): df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3), 'B': np.arange(3.0)}) df.to_sql('test_datetime', self.conn) # with read_table -> type information from schema used result = sql.read_sql_table('test_datetime', self.conn) result = result.drop('index', axis=1) tm.assert_frame_equal(result, df) # with read_sql -> no type information -> sqlite has no native result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn) result = result.drop('index', axis=1) if self.flavor == 'sqlite': self.assertTrue(isinstance(result.loc[0, 'A'], string_types)) result['A'] = to_datetime(result['A']) tm.assert_frame_equal(result, df) else: tm.assert_frame_equal(result, df)
8、时间类型处理的示例demo代码
def test_datetime_NaT(self): df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3), 'B': np.arange(3.0)}) df.loc[1, 'A'] = np.nan df.to_sql('test_datetime', self.conn, index=False) # with read_table -> type information from schema used result = sql.read_sql_table('test_datetime', self.conn) tm.assert_frame_equal(result, df) # with read_sql -> no type information -> sqlite has no native result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn) if self.flavor == 'sqlite': self.assertTrue(isinstance(result.loc[0, 'A'], string_types)) result['A'] = to_datetime(result['A'], errors='coerce') tm.assert_frame_equal(result, df) else: tm.assert_frame_equal(result, df)