Asynchronous Replication
Asynchronous Replication
Since version 1.11 it's possible to organize asynchronous replication of eXo JCR Repositories.1 Concept
TODO: "big picture" and Repositories communication description. Service establishes PUSH style communications where all members located in peer-to-peer network and sends its changes to other. Networking based on "channel" abstraction (based on JGroups). Each Repository are member of Replication if and only if that pointed in configuration of the Replication service. Communications between servers organized using "star" schema. Each Repository (server) send its changes to all replication members. And each Repository merges all remote and own (local) changes to obtain consistent state. Replication works in one channel, the channel exists in physical network connection (e.g. VPN, p2p). Physical connection should provide such features as security (encrypting, authentication etc) and speed. Service architecture:
It's JGroups based channel. Negotiations model consists of two parts - channel notifications (JGroups events fired to the Replication as events) and channel data packets (data sending by members).
Changes delivery is PUSH based, i.e. owner of changes initiate the process of changes transfer. Exception is merge process which may ask for a remote JCR Node copy via "get export" command. Export will be transfered in PUSH style, i.e. in independent to a get-request packet. Request member should catch the response using special logic.
2 Replication process
Initially each Repository should be identical in content to others. Content it's data container (database and external Value storage) and search index files. Replication service can start the process by method synchronize call. On this call the service will setup channel, connects to it and wait for other members connected. When all configured members have connected to the channel - it's time to start. If no one member connected during the init-wait timeout the process will be canceled. When second member will be connected to the channel the first higher (priority) member starts timer of last member waiting. When the last member timer is expired the Replication process can be canceled or continued (without one or more of members) - it's configurable. Cancel by default. First higher member will send Cancel message to other members in this case. Changes exchange starts on last member connected to the channel event. Each member sends own changes to other members on that event. When a member has all members changes (in storage) the merge process starts. After the merge done and applied to a local Repository the member send done message to other. When the member receive done message from all other members it disconnects from the channel. Synchronization done. If any error occurs during transmit or merge the synchronization will be canceled on all members.
See also
Backup service
If error occurs after the merge (i.e. on save) it's a problem of this Repository only. Repository should be returned to synchronous state using external tools (e.g. backup).
3 Configuration
Replication configuration contains :- Repository name
- Workspace name
- Local member priority
- Other members priorities
- Channel configuration (incl. local bind IP-address)
- Changes storage configuration (paths)
<component> <type>org.exoplatform.services.jcr.ext.replication.async.AsyncReplication</type> <component-plugins> <component-plugin> <name>async-workspace-config</name> <set-method>addAsyncWorkspaceConfig</set-method> <type>org.exoplatform.services.jcr.ext.replication.async.config.AsyncWorkspaceConfig</type> <description>async replication config per workspace</description> <init-params> <properties-param> <name>async-workspace-config</name> <property name="repository-name" value="repository"/> <property name="workspace-name" value="production"/> <property name="priority" value="100"/> <property name="other-participants-priority" value="50,25" /> <property name="bind-ip-address" value="192.168.0.30"/> <property name="channel-config" value="TCP(start_port=7700;oob_thread_pool.queue_max_size=100;thread_naming_pattern=cl;use_concurrent_stack=true;oob_thread_pool.rejection_policy=Run;discard_incompatible_packets=true;thread_pool.max_threads=40;oob_thread_pool.enabled=true;oob_thread_pool.max_threads=20;loopback=false;oob_thread_pool.keep_alive_time=5000;thread_pool.queue_enabled=false;oob_thread_pool.queue_enabled=false;max_bundle_size=64000;thread_pool.queue_max_size=100;thread_pool.enabled=true;enable_diagnostics=true;max_bundle_timeout=30;oob_thread_pool.min_threads=8;use_incoming_packet_handler=true;thread_pool.rejection_policy=Run;bind_addr=$bind-ip-address;thread_pool.min_threads=8;thread_pool.keep_alive_time=5000;enable_bundling=true):MPING(timeout=2000;num_initial_members=8;mcast_port=35526;mcast_addr=224.0.0.1):FD(timeout=2000;max_tries=5;shun=true):VERIFY_SUSPECT(timeout=1500):ENCRYPT(encrypt_entire_message=true;sym_init=128;sym_algorithm=AES/ECB/PKCS5Padding;asym_init=512;asym_algorithm=RSA):pbcast.NAKACK(max_xmit_size=60000;print_stability_history_on_failed_xmit=true;use_mcast_xmit=false;gc_lag=0;discard_delivered_msgs=true;retransmit_timeout=300,600,1200,2400,4800):pbcast.STABLE(stability_delay=1000;desired_avg_gossip=50000;max_bytes=8000000):pbcast.GMS(print_local_addr=true;join_timeout=3000;view_bundling=true;join_retry_timeout=2000;shun=true;merge_leader=true;reject_join_from_existing_member=true)"/> <property name="channel-name" value="AsyncRepCh1"/> <property name="storage-dir" value="../temp/asyncreplication/production"/> <property name="wait-all-members" value="300"/> </properties-param> </init-params> </component-plugin> </component-plugins> </component>
- Repository name
- Workspaces name
<external-component-plugins> <target-component>org.exoplatform.services.jcr.RepositoryService</target-component> <component-plugin> <name>register.listener</name> <set-method>addPlugin</set-method> <type>org.exoplatform.services.jcr.impl.RepositoryChangesListenerRegisterPlugin</type> <init-params> <value-param> <name>repository-name</name> <value>repository</value> </value-param> <value-param> <name>workspaces</name> <value>production</value> </value-param> <value-param> <name>component-class-name</name> <value>org.exoplatform.services.jcr.ext.replication.async.AsyncStartChangesListener</value> </value-param> </init-params> </component-plugin> </external-component-plugins>
4 Service API
Asynchronous replication service has a very simple API. Only two methods required: start (synchronize) and check if replication is active. Each Replication service (server) should be invoked for a synchronization directly via synchronize method. Methods of org.exoplatform.services.jcr.ext.replication.async.AsyncReplication:// Initialize synchronization process. Process will use the service configuration. public synchronized boolean synchronize() throws RepositoryException, RepositoryConfigurationException, IOException; // Tell if synchronization process active (return true if active). public boolean isActive();
5 Executor service
Executor service helps to start replication simultaneously at few servers (Repositories). It's external to Asynchronous Replication service application based on REST (HTTP) communications between servers. Executor uses its configuration to obtain remote Repositories addresses. They should run Executor service too. But only one instance should be invoked to start whole members group synchronization.<component> <type>org.exoplatform.services.jcr.ext.replication.async.executor.AsyncReplicationExecutor</type> <init-params> <object-param> <name>async-replication-executor-configuration</name> <description>async replication nodes</description> <object type="org.exoplatform.services.jcr.ext.replication.async.executor.AsyncReplicationExecutor$ExecutorConf"> <field name="members"> <collection type="java.util.ArrayList"> <value> <object type="org.exoplatform.services.jcr.ext.replication.async.executor.Member"> <field name="url"> <string>http://root:exo@192.168.0.3:9080</string> </field> <field name="realmName"> <string>eXo REST services</string> <!-- for EXO Applications --> <!-- string>exo-domain</string --> <!-- for ECM --> </field> </object> </value> <value> <object type="org.exoplatform.services.jcr.ext.replication.async.executor.Member"> <field name="url"> <string>http://root:exo@192.168.0.3:10080</string> </field> <field name="realmName"> <string>eXo REST services</string> <!-- for EXO Applications --> <!-- string>exo-domain</string --> <!-- for ECM --> </field> </object> </value> </collection> </field> </object> </object-param> </init-params> </component>
// Execute synchronization on whole group from configuration. public boolean synchronize() throws AsyncReplicationExecutorException;
6 Usage
Example of simple end-user usecase we have implemented in JCR demo application "browser". This sample can be used as base for real-usecases implementations. Find details in org.exoplatform.applications.jcr.browser.JCRBrowser bean.// Obtain AsynReplication service instance from Repository Workspace container. WorkspaceContainerFacade container = repository.getWorkspaceContainer(this.repository.getConfiguration() .getDefaultWorkspaceName()); AsyncReplication areplication = (AsyncReplication) container.getComponent(AsyncReplication.class); // start synchronization on this Repository only (only if it is not already started) if (!areplication.isActive()) areplication.synchronize(); ........ // or get Executor AsyncReplicationExecutor aexecutor = (AsyncReplicationExecutor) container.getComponent(AsyncReplicationExecutor.class); // and start all members synchronization, // but you have to ask the status on Replication service directly if (!areplication.isActive()) aexecutor.synchronize();