Rxjava2_Flowable_Sqlite_Android数据库访问实例

下面小编就为大家分享一篇Rxjava2_Flowable_Sqlite_Android数据库访问实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

一、使用Rxjava访问数据库的优点:

1.随意的线程控制,数据库操作在一个线程,返回数据处理在ui线程

2.随时订阅和取消订阅,而不必再使用回调函数

3.对读取的数据用rxjava进行过滤,流式处理

4.使用sqlbrite可以原生返回rxjava的格式,同时是响应式数据库框架

(有数据添加和更新时自动调用之前订阅了的读取函数,达到有数据添加自动更新ui的效果,

同时这个特性没有禁止的方法,只能通过取消订阅停止这个功能,对于有的框架这反而是一种累赘)

二、接下来之关注实现过程:

本次实现用rxjava2的Flowable,有被压支持(在不需要被压支持的情况建议使用Observable)

实现一个稳健的的可灵活切换其他数据库的结构,当然是先定义数据库访问接口。然后跟具不同的数据库实现接口的方法

定义接口:(对于update,delete,insert,可以选择void类型,来简化调用代码,但缺少了执行结果判断)

 public interface DbSource { //String sql = "insert into table_task (tid,startts) values(tid,startts)"; Flowable insertNewTask(int tid, int startts); //String sql = "select * from table_task"; Flowable> getAllTask(); //String sql = "select * from table_task where endts = 0"; Flowable> getRunningTask(); //String sql = "update table_task set isuploadend=isuploadend where tid=tid"; Flowable markUploadEnd(int tid, boolean isuploadend); //String sql = "delete from table_task where tid=tid and endts>0"; Flowable deleteTask(int tid); } 

三、用Android原生的Sqlite实现数据库操作

 public class SimpleDb implements DbSource { private static SimpleDb sqlite; private SqliteHelper sqliteHelper; private SimpleDb(Context context) { this.sqliteHelper = new SqliteHelper(context); } public static synchronized SimpleDb getInstance(Context context) { if (sqlite == null ) sqlite = new SimpleDb(context); return sqlite; } Flowable insertNewTask(int tid, int startts) { return Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter e) throws Exception { //这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法 ContentValues values = new ContentValues(); values.put(“tid”, 1); values.put(“startts”,13233); if(sqliteHelper.getWriteableDatabase().insert(TABLE_NAME, null, values) != -1) e.onNext(true); else e.onNext(false); e.onComplete(); } }, BackpressureStrategy.BUFFER); } Flowable> getAllTask() { return Flowable.create(new FlowableOnSubscribe>() { @Override public void subscribe(FlowableEmitter> e) throws Exception { List taskList = new ArrayList<>(); StringBuilder sql = new StringBuilder(100); sql.append("select * from "); sql.append(SqliteHelper.TABLE_NAME_TASK); SQLiteDatabase sqLiteDatabase = sqliteHelper.getReadableDatabase(); Cursor cursor = sqLiteDatabase.rawQuery(sql.toString(), null); if (cursor.moveToFirst()) { int count = cursor.getCount(); for (int a = 0; a > getRunningTask() { return Flowable.create(new FlowableOnSubscribe>() { @Override public void subscribe(FlowableEmitter> e) throws Exception { TaskItem item = null; StringBuilder sql = new StringBuilder(100); sql.append("select * from "); sql.append(SqliteHelper.TABLE_NAME_TASK); sql.append(" where endts=0 limit 1"); SQLiteDatabase sqLiteDatabase = sqliteHelper.getReadableDatabase(); Cursor cursor = sqLiteDatabase.rawQuery(sql.toString(), null); if (cursor.moveToFirst()) { int count = cursor.getCount(); if (count == 1) { item = new TaskItem(); item.setId(cursor.getInt(0)); item.setTid(cursor.getInt(1)); item.setStartts(cursor.getInt(2)); item.setEndts(cursor.getInt(3)); } } cursor.close(); sqLiteDatabase.close(); e.onNext(Optional.fromNullable(item)); //import com.google.common.base.Optional;//安全检查,待会看调用的代码,配合rxjava很好 e.onComplete(); } }, BackpressureStrategy.BUFFER); } Flowable markUploadEnd(int tid, boolean isuploadend) { return Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter e) throws Exception { //这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法 //数据库操作代码 e.onNext(false);//返回结果 e.onComplete();//返回结束 } }, BackpressureStrategy.BUFFER); } Flowable deleteTask(int tid) { return Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter e) throws Exception { //这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法 //数据库操作代码 e.onNext(false);//返回结果 e.onComplete();//返回结束 } }, BackpressureStrategy.BUFFER); } } 

四、同一个接口使用sqlbrite的实现方式

 public class BriteDb implements DbSource { @NonNull protected final BriteDatabase mDatabaseHelper; @NonNull private Function mTaskMapperFunction; @NonNull private Function mPoiMapperFunction; @NonNull private Function mInterestPoiMapperFunction; // Prevent direct instantiation. private BriteDb(@NonNull Context context) { DbHelper dbHelper = new DbHelper(context); SqlBrite sqlBrite = new SqlBrite.Builder().build(); mDatabaseHelper = sqlBrite.wrapDatabaseHelper(dbHelper, Schedulers.io(); mTaskMapperFunction = this::getTask; mPoiMapperFunction = this::getPoi; mInterestPoiMapperFunction = this::getInterestPoi; } @Nullable private static BriteDb INSTANCE; public static BriteDb getInstance(@NonNull Context context) { if (INSTANCE == null) { INSTANCE = new BriteDb(context); } return INSTANCE; } @NonNull private TaskItem getTask(@NonNull Cursor c) { TaskItem item = new TaskItem(); item.setId(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ID))); item.setTid(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_TID))); item.setStartts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS))); item.setEndts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS))); return item; } @Override public void insertNewTask(int tid, int startts) { ContentValues values = new ContentValues(); values.put(PersistenceContract.TaskEntry.COLUMN_TASK_TID, tid); values.put(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS, startts); mDatabaseHelper.insert(PersistenceContract.TaskEntry.TABLE_NAME_TASK, values, SQLiteDatabase.CONFLICT_REPLACE); } @Override public Flowable> getAllTask() { String sql = String.format("SELECT * FROM %s", PersistenceContract.TaskEntry.TABLE_NAME_TASK);//TABLE_NAME_TASK表的名字字符串 return mDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK, sql) .mapToList(mTaskMapperFunction) .toFlowable(BackpressureStrategy.BUFFER); } @Override public Flowable> getRunningTask() { String sql = String.format("SELECT * FROM %s WHERE %s = ? limit 1", PersistenceContract.TaskEntry.TABLE_NAME_TASK, PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS); return mDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK, sql, "0") .mapToOne(cursor -> Optional.fromNullable(mTaskMapperFunction.apply(cursor))) .toFlowable(BackpressureStrategy.BUFFER); } @Override public Flowable markUploadEnd(int tid, boolean isuploadend) { return Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter e) throws Exception { ContentValues values = new ContentValues(); if(isuploadend) { values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND, 1); } else { values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND, 0); } String selection = PersistenceContract.TaskEntry.COLUMN_TASK_TID + " = ?"; //String[] selectionArgs = {String.valueOf(tid)}; String selectionArgs = String.valueOf(tid); int res = mDatabaseHelper.update(PersistenceContract.TaskEntry.TABLE_NAME_TASK, values, selection, selectionArgs); if (res > 0) { e.onNext(true);//返回结果 } else { e.onNext(false);//返回结果 } e.onComplete();//返回结束 } }, BackpressureStrategy.BUFFER); } @Override public Flowable deleteTask(int tid) { return Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter e) throws Exception { String selection = PersistenceContract.TaskEntry.COLUMN_TASK_TID + " = ? AND "+ PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS + " > 0"; String[] selectionArgs = new String[1]; selectionArgs[0] = String.valueOf(tid); int res = mDatabaseHelper.delete(PersistenceContract.TaskEntry.TABLE_NAME_TASK, selection, selectionArgs); if (res > 0) { e.onNext(true);//返回结果 } else { e.onNext(false);//返回结果 } e.onComplete();//返回结束 } }, BackpressureStrategy.BUFFER); } } 

五、数据库调用使用方法

使用了lambda简化了表达式进一步简化代码:

简化方法:在/app/build.gradle里面加入如下内容:(defaultConfig的外面)

 compileOptions { sourceCompatibility JavaVersion.VERSION_1_8 targetCompatibility JavaVersion.VERSION_1_8 } 

接口调用(获得数据库实例):

 //全局定义的实例获取类,以后想要换数据库,只需在这个类里切换即可 public class Injection { public static DbSource getDbSource(Context context) { //choose one of them //return BriteDb.getInstance(context); return SimpleDb.getInstance(context); } } DbSource db = Injection.getInstance(mContext); disposable1 = db.getAllTask() .flatMap(Flowable::fromIterable) .filter(task -> {     //自定义过滤 if (!task.getIsuploadend()) { return true; } else { return false; } }) .subscribe(taskItems -> //这里是使用了lambda简化了表达式 doTaskProcess(taskItems) , throwable -> { throwable.printStackTrace(); },// onCompleted () -> { if (disposable1 != null && !disposable1.isDisposed()) { disposable1.dispose(); } }); disposable1 = db.getRunningTask() .filter(Optional::isPresent) //判断是否为空,为空的就跳过 .map(Optional::get)    //获取到真的参数 .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(taskItem -> {     //onNext() //has running task mTid = taskItem.getTid(); }, throwable -> throwable.printStackTrace() //onError() , () -> disposable1.dispose());    //onComplete() disposable1 = db.markUploadEnd(tid, isuploadend) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(status -> {     //onNext() if (status) { //dosomething } }, throwable -> throwable.printStackTrace() //onError() , () -> disposable1.dispose());    //onComplete() disposable1 = db.deleteTask(tid) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(status -> {     //onNext() if (status) { //dosomething } }, throwable -> throwable.printStackTrace() //onError() , () -> disposable1.dispose());    //onComplete() 

以上就是Rxjava2_Flowable_Sqlite_Android数据库访问实例的详细内容,更多请关注0133技术站其它相关文章!

赞(0) 打赏
未经允许不得转载:0133技术站首页 » 移动