Как лучше всего реализовать Rx Observable, который будет передавать потокозащищенный List нескольким подписчикам?

Моя задача:

Устройство должно писать пройденный маршрут на сервер по HTTP. При недоступности сервера/интернета/связи - писать маршрут во внутреннюю базу данных телефона. При восстановлении соединения - дописать недостающие точки на сервер.

Структура и модель приложения, которую я пытаюсь применить (первое приложение на Андроид, как-никак):

CurrentLocationHolder
- хранит текущее местоположение, буфер местоположений, Subject, подписанный на заполнение буфера

LocationListener
- получает новое местоположение, сохраняет его в свойство CurrentLocationHolder.getInstance().currentLocation, и добавляет его в буфер, т.о. точки, которые ещё не были отправлены на сервер, но и не сохранены в БД, хранятся в (List)CurrentLocationHolder.getInstance().locationBuffer

HTTPManager (менеджер HTTP-запросов)
- управляет очередью запросов к серверу. подписывается на факт наполнения буфера, получает его содержимое, и фигачит его на сервер;
- по onError (сетевая ошибка, потеря интернета, ответа от сервера) меняет свойство NetStateListener.Connected на false, прекращает отправку запросов на сервер;

NetStateListener - это простой пингер, имеющий Subject, подписан на изменения свойства Connected (false - начинает опрашивать сервер. Как только сервак ответил - меняет свойство Connected на true.
HTTPManager следит за ним, и соответственно реагирует.

DatabaseManager подписан на факт onError HTTPManager'a, получает от onError список не дошедших до сервера точек, пишет их в БД. DatabaseManager подписан так же и на NetStateListener.Connected (когда false - пишет буфер местоположений в БД, когда true - начинает по-тихоньку подкидывать в буфер недоставленные точки местоположения из БД)

Подскажите, правильно ли я описал модель приложения? Всё-таки, первое приложение на Android...

Пока реализовал только вот что:
Объект-синглтон, хранящий текущее местоположение, буфер местоположений, Rx - субъект locationBufferFull
public class CurrentLocationHolder {
    //Буфер предыдущих местоположений. При наступлении порога locationBuffer.size() его элементы 
    //должны передаваться подписчикам
    //я так понял, что буфер должен быть потокосогласованным (синхронным). поправьте, если я не прав...
    private List<Location> locationBuffer = Collections.synchronizedList(new ArrayList<>());
   
    //это свойство хранит текущее местоположение
    private Location currentLocation;
    
    //создаём субъект подписки (нафига - я ещё до конца не понимаю, видимо для того, чтобы вызвать метод onNext() )
    private final PublishSubject<List<LocationPoint>> locationBufferFull = PublishSubject.create();

    //Этот метод, если я правильно понял, возвращает Observable при регистрации подписчиков из 
    //внешнего кода. поясните, плз, для чего тут emitCurrentValue?
    public Observable<List<LocationPoint>> observeLocationBufferFull(boolean emitCurrentValue) {
        return emitCurrentValue ? locationBufferFull.startWith(locationBuffer) : locationBufferFull;
    }

    //сеттер для свойства this.currentLocation
    public void setLocation(Location point) {
        this.currentLocation = point;

        //добавляем местоположение в буфер
        locationBuffer.add(point);

        //если размер буфера превышает порог, оповещаем подписчиков о заполнении буфера, и передаем 
        //содержимое буфера подписчикам. 
        if (locationBuffer.size() >= 10) {
            //может быть, рационально вызвать onNext() с аргументом - копией буфера?
            locationBufferChanged.onNext(this.locationBuffer);
        }
        //после обработки подписчиками, очищаем буфер
        locationBuffer.clear();
    }
}


Код подписчика:
public class DatabaseManager {
    //поле для хранения объекта подписки (оно нужно вообще? и так кода наплодил...)
    private Subscription locationBufferSubscription;

    private static DatabaseManager instance;
    
    public static void InitInstance() {
        if (instance == null) {
            instance = new DatabaseManager();
        //при инициализации экземпляра класса подписываемся на заполнение locationBuffer 
        instance.locationBufferSubscription = 
            CurrentLocationHolder.getInstance().observeLocationBufferFull()
                .subscribe(listOfLocations -> {
                    //пишем содержимое буфера в базу
                    ActiveAndroid.beginTransaction();
                    try {
                        for (int i = 0; i < locArray.size(); i++) {
                            listOfLocations.get(i).save();
                        }
                        ActiveAndroid.setTransactionSuccessful();
                    } finally {
                        ActiveAndroid.endTransaction();
                    }
            });
        }
    }
}


В этот код ещё добавится Retrofit, который будет писать местоположение на сервер... Подскажите, как мне проще всего передавать управление на обработку буфера DatabaseManager'у, если retrofit выдаст onError? И как в этом случае избежать потери данных? Ведь они должны все гарантированно попасть в базу, а при восстановлении интернет-соединения все гарантированно попасть на сервер...

И мелкие вопросы:

  1. Будет ли передаваемый locationBuffer неизменным для всех подписчиков?
  2. locationBuffer.clear(); выполнится только после того, как его обработают все подписчики? Метод locationBuffer.add(point) должен ждать в блокировке, пока не исполнится locationBuffer.clear();
  3. Может, как-то проще такое поведение реализовать? Я, наверное, пишу много ненужного кода на Rx


Сор, если вопросы и реализация - тупые, я с Java и AndroidSDK познакомился чуть больше недели назад. В философию RxJava ещё не въехал, но задачу необходимо решить срочно, времени на раскачку нет совсем... Помогите, плз!
  • Вопрос задан
  • 426 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы