Asynchronous Replication

Since version 1.11 it's possible to organize asynchronous replication of eXo JCR Repositories.

1 Concept

TODO: "big picture" and Repositories communication description.

Service establishes PUSH style communications where all members located in peer-to-peer network and sends its changes to other. Networking based on "channel" abstraction (based on JGroups). Each Repository are member of Replication if and only if that pointed in configuration of the Replication service.

Communications between servers organized using "star" schema. Each Repository (server) send its changes to all replication members. And each Repository merges all remote and own (local) changes to obtain consistent state.

Replication works in one channel, the channel exists in physical network connection (e.g. VPN, p2p). Physical connection should provide such features as security (encrypting, authentication etc) and speed.

Service architecture:

AsyncReplication.png

It's JGroups based channel. Negotiations model consists of two parts - channel notifications (JGroups events fired to the Replication as events) and channel data packets (data sending by members).

Changes delivery is PUSH based, i.e. owner of changes initiate the process of changes transfer. Exception is merge process which may ask for a remote JCR Node copy via "get export" command. Export will be transfered in PUSH style, i.e. in independent to a get-request packet. Request member should catch the response using special logic.

2 Replication process

Initially each Repository should be identical in content to others. Content it's data container (database and external Value storage) and search index files.

Use Ext backup service or any external too (e.g. files copy, database backup) to organize this.

Replication service can start the process by method synchronize call. On this call the service will setup channel, connects to it and wait for other members connected.

When all configured members have connected to the channel - it's time to start. If no one member connected during the init-wait timeout the process will be canceled.

When second member will be connected to the channel the first higher (priority) member starts timer of last member waiting.

When the last member timer is expired the Replication process can be canceled or continued (without one or more of members) - it's configurable. Cancel by default. First higher member will send Cancel message to other members in this case.

Changes exchange starts on last member connected to the channel event.

Each member sends own changes to other members on that event.

When a member has all members changes (in storage) the merge process starts. After the merge done and applied to a local Repository the member send done message to other.

When the member receive done message from all other members it disconnects from the channel.

Synchronization done.

If any error occurs during transmit or merge the synchronization will be canceled on all members.

See also Backup service

If error occurs after the merge (i.e. on save) it's a problem of this Repository only. Repository should be returned to synchronous state using external tools (e.g. backup).

3 Configuration

Replication configuration contains :

  • Repository name
  • Workspace name
  • Local member priority
  • Other members priorities
  • Channel configuration (incl. local bind IP-address)
  • Changes storage configuration (paths)
<component>
    <type>org.exoplatform.services.jcr.ext.replication.async.AsyncReplication</type>
    <component-plugins>
      
      <component-plugin>
        <name>async-workspace-config</name>
        <set-method>addAsyncWorkspaceConfig</set-method>
        <type>org.exoplatform.services.jcr.ext.replication.async.config.AsyncWorkspaceConfig</type>
        <description>async replication config per workspace</description>
        <init-params>
          <properties-param>
            <name>async-workspace-config</name>
            <property name="repository-name" value="repository"/>
            <property name="workspace-name" value="production"/>
            <property name="priority" value="100"/>
            <property name="other-participants-priority" value="50,25" />
            <property name="bind-ip-address" value="192.168.0.30"/>
            <property name="channel-config" value="TCP(start_port=7700;oob_thread_pool.queue_max_size=100;thread_naming_pattern=cl;use_concurrent_stack=true;oob_thread_pool.rejection_policy=Run;discard_incompatible_packets=true;thread_pool.max_threads=40;oob_thread_pool.enabled=true;oob_thread_pool.max_threads=20;loopback=false;oob_thread_pool.keep_alive_time=5000;thread_pool.queue_enabled=false;oob_thread_pool.queue_enabled=false;max_bundle_size=64000;thread_pool.queue_max_size=100;thread_pool.enabled=true;enable_diagnostics=true;max_bundle_timeout=30;oob_thread_pool.min_threads=8;use_incoming_packet_handler=true;thread_pool.rejection_policy=Run;bind_addr=$bind-ip-address;thread_pool.min_threads=8;thread_pool.keep_alive_time=5000;enable_bundling=true):MPING(timeout=2000;num_initial_members=8;mcast_port=35526;mcast_addr=224.0.0.1):FD(timeout=2000;max_tries=5;shun=true):VERIFY_SUSPECT(timeout=1500):ENCRYPT(encrypt_entire_message=true;sym_init=128;sym_algorithm=AES/ECB/PKCS5Padding;asym_init=512;asym_algorithm=RSA):pbcast.NAKACK(max_xmit_size=60000;print_stability_history_on_failed_xmit=true;use_mcast_xmit=false;gc_lag=0;discard_delivered_msgs=true;retransmit_timeout=300,600,1200,2400,4800):pbcast.STABLE(stability_delay=1000;desired_avg_gossip=50000;max_bytes=8000000):pbcast.GMS(print_local_addr=true;join_timeout=3000;view_bundling=true;join_retry_timeout=2000;shun=true;merge_leader=true;reject_join_from_existing_member=true)"/>
            <property name="channel-name" value="AsyncRepCh1"/>
            <property name="storage-dir" value="../temp/asyncreplication/production"/>
            <property name="wait-all-members" value="300"/>
          </properties-param>
        </init-params>
      </component-plugin>
      
   
    </component-plugins>
  </component>

Additional you need to configure plugin which register listeners from start:

  • Repository name
  • Workspaces name
<external-component-plugins>
    <target-component>org.exoplatform.services.jcr.RepositoryService</target-component>
      <component-plugin>
        <name>register.listener</name>
        <set-method>addPlugin</set-method>
        <type>org.exoplatform.services.jcr.impl.RepositoryChangesListenerRegisterPlugin</type>
        <init-params>
          <value-param>
            <name>repository-name</name>
            <value>repository</value>
          </value-param>
          <value-param>
            <name>workspaces</name>
            <value>production</value>
          </value-param>
          <value-param>
            <name>component-class-name</name>
            <value>org.exoplatform.services.jcr.ext.replication.async.AsyncStartChangesListener</value>
          </value-param>
        </init-params>
      </component-plugin>
  </external-component-plugins>

Add listener only for those workspaces, which specified in Replication configuration.

4 Service API

Asynchronous replication service has a very simple API. Only two methods required: start (synchronize) and check if replication is active.

Only one synchronization process is possible at time.

Each Replication service (server) should be invoked for a synchronization directly via synchronize method.

Methods of org.exoplatform.services.jcr.ext.replication.async.AsyncReplication:

// Initialize synchronization process. Process will use the service configuration.
  public synchronized boolean synchronize() throws RepositoryException, RepositoryConfigurationException, IOException;

  // Tell if synchronization process active (return true if active).
  public boolean isActive();

It's important to understand that on synchronize call the service opens the channel and waits for other members (remote Repositories) connected. But not invoke synchronization on remote servers.

Remote synchronize may be called using any external mechanism (e.g. scheduler).

If you don't have such mechanism and just want run it at moment you can use Executor service.

5 Executor service

Executor service helps to start replication simultaneously at few servers (Repositories). It's external to Asynchronous Replication service application based on REST (HTTP) communications between servers.

Executor uses its configuration to obtain remote Repositories addresses. They should run Executor service too.

But only one instance should be invoked to start whole members group synchronization.

<component>                                                                                                                    
    <type>org.exoplatform.services.jcr.ext.replication.async.executor.AsyncReplicationExecutor</type>                                                       
    <init-params>                                                                                                                   
      <object-param>                                                                                                                  
        <name>async-replication-executor-configuration</name>                                                                                            
        <description>async replication nodes</description>                                                                                      
        <object type="org.exoplatform.services.jcr.ext.replication.async.executor.AsyncReplicationExecutor$ExecutorConf">                             
          <field name="members">                                                                                                           
            <collection type="java.util.ArrayList">                                                                                       
              <value>                                                                                                                      
                <object type="org.exoplatform.services.jcr.ext.replication.async.executor.Member">
                  <field name="url">                                                                                                          
                    <string>http://root:exo@192.168.0.3:9080</string>
                  </field>                                                      
                  <field name="realmName">                                                                                                          
                    <string>eXo REST services</string> <!-- for EXO Applications -->
                    <!-- string>exo-domain</string --> <!-- for ECM -->                                                                                                     
                  </field>
                </object>                                                                                                                   
              </value>                                                                                                                     
              <value>                                                                                                                      
                <object type="org.exoplatform.services.jcr.ext.replication.async.executor.Member">
                  <field name="url">                                                                                                          
                    <string>http://root:exo@192.168.0.3:10080</string>                                                                                                        
                  </field>
                  <field name="realmName">                                                                                                          
                    <string>eXo REST services</string> <!-- for EXO Applications -->
                    <!-- string>exo-domain</string --> <!-- for ECM -->                                                                                                        
                  </field>                                                                                                                   
                </object>                                                                                                                   
              </value>
            </collection>                                                                                                                 
          </field>                                                                                                                       
        </object>                                                                                                                       
      </object-param>                                                                                                                  
    </init-params>                                                                                                                    
  </component>

Executor usage is very similar to the Replication service. There is one method in org.exoplatform.services.jcr.ext.replication.async.executor.AsyncReplicationExecutor:

// Execute synchronization on whole group from configuration.
  public boolean synchronize() throws AsyncReplicationExecutorException;

6 Usage

Example of simple end-user usecase we have implemented in JCR demo application "browser". This sample can be used as base for real-usecases implementations.

Sample will show 'synchronize' button on admin page only if AsyncReplicationExecutor service is configured in configuration. It's not default for eXo JCR demos.

Find details in org.exoplatform.applications.jcr.browser.JCRBrowser bean.

// Obtain AsynReplication service instance from Repository Workspace container.
  WorkspaceContainerFacade container = repository.getWorkspaceContainer(this.repository.getConfiguration()
                                                                                                .getDefaultWorkspaceName());
      
  AsyncReplication areplication = (AsyncReplication) container.getComponent(AsyncReplication.class);

  // start synchronization on this Repository only (only if it is not already started)
  if (!areplication.isActive())  
    areplication.synchronize();
  

  ........
  // or get Executor 
  AsyncReplicationExecutor aexecutor = (AsyncReplicationExecutor) container.getComponent(AsyncReplicationExecutor.class); 

  // and start all members synchronization, 
  // but you have to ask the status on Replication service directly
  if (!areplication.isActive())  
    aexecutor.synchronize();
Recently Modified

Creator: Peter Nedonosko on 02/11/2009
Copyright (c) 2000-2009. Allright reserved - eXo platform SAS
1.6.13286