1package DBIx::Class::Storage::DBI::Replicated; 2 3BEGIN { 4 use Carp::Clan qw/^DBIx::Class/; 5 use DBIx::Class; 6 croak('The following modules are required for Replication ' . DBIx::Class::Optional::Dependencies->req_missing_for ('replicated') ) 7 unless DBIx::Class::Optional::Dependencies->req_ok_for ('replicated'); 8} 9 10use Moose; 11use DBIx::Class::Storage::DBI; 12use DBIx::Class::Storage::DBI::Replicated::Pool; 13use DBIx::Class::Storage::DBI::Replicated::Balancer; 14use DBIx::Class::Storage::DBI::Replicated::Types qw/BalancerClassNamePart DBICSchema DBICStorageDBI/; 15use MooseX::Types::Moose qw/ClassName HashRef Object/; 16use Scalar::Util 'reftype'; 17use Hash::Merge; 18use List::Util qw/min max/; 19 20use namespace::clean -except => 'meta'; 21 22=head1 NAME 23 24DBIx::Class::Storage::DBI::Replicated - BETA Replicated database support 25 26=head1 SYNOPSIS 27 28The Following example shows how to change an existing $schema to a replicated 29storage type, add some replicated (read-only) databases, and perform reporting 30tasks. 31 32You should set the 'storage_type attribute to a replicated type. You should 33also define your arguments, such as which balancer you want and any arguments 34that the Pool object should get. 35 36 my $schema = Schema::Class->clone; 37 $schema->storage_type( ['::DBI::Replicated', {balancer=>'::Random'}] ); 38 $schema->connection(...); 39 40Next, you need to add in the Replicants. Basically this is an array of 41arrayrefs, where each arrayref is database connect information. Think of these 42arguments as what you'd pass to the 'normal' $schema->connect method. 43 44 $schema->storage->connect_replicants( 45 [$dsn1, $user, $pass, \%opts], 46 [$dsn2, $user, $pass, \%opts], 47 [$dsn3, $user, $pass, \%opts], 48 ); 49 50Now, just use the $schema as you normally would. Automatically all reads will 51be delegated to the replicants, while writes to the master. 52 53 $schema->resultset('Source')->search({name=>'etc'}); 54 55You can force a given query to use a particular storage using the search 56attribute 'force_pool'. For example: 57 58 my $RS = $schema->resultset('Source')->search(undef, {force_pool=>'master'}); 59 60Now $RS will force everything (both reads and writes) to use whatever was setup 61as the master storage. 'master' is hardcoded to always point to the Master, 62but you can also use any Replicant name. Please see: 63L<DBIx::Class::Storage::DBI::Replicated::Pool> and the replicants attribute for more. 64 65Also see transactions and L</execute_reliably> for alternative ways to 66force read traffic to the master. In general, you should wrap your statements 67in a transaction when you are reading and writing to the same tables at the 68same time, since your replicants will often lag a bit behind the master. 69 70See L<DBIx::Class::Storage::DBI::Replicated::Instructions> for more help and 71walkthroughs. 72 73=head1 DESCRIPTION 74 75Warning: This class is marked BETA. This has been running a production 76website using MySQL native replication as its backend and we have some decent 77test coverage but the code hasn't yet been stressed by a variety of databases. 78Individual DBs may have quirks we are not aware of. Please use this in first 79development and pass along your experiences/bug fixes. 80 81This class implements replicated data store for DBI. Currently you can define 82one master and numerous slave database connections. All write-type queries 83(INSERT, UPDATE, DELETE and even LAST_INSERT_ID) are routed to master 84database, all read-type queries (SELECTs) go to the slave database. 85 86Basically, any method request that L<DBIx::Class::Storage::DBI> would normally 87handle gets delegated to one of the two attributes: L</read_handler> or to 88L</write_handler>. Additionally, some methods need to be distributed 89to all existing storages. This way our storage class is a drop in replacement 90for L<DBIx::Class::Storage::DBI>. 91 92Read traffic is spread across the replicants (slaves) occurring to a user 93selected algorithm. The default algorithm is random weighted. 94 95=head1 NOTES 96 97The consistency between master and replicants is database specific. The Pool 98gives you a method to validate its replicants, removing and replacing them 99when they fail/pass predefined criteria. Please make careful use of the ways 100to force a query to run against Master when needed. 101 102=head1 REQUIREMENTS 103 104Replicated Storage has additional requirements not currently part of 105L<DBIx::Class>. See L<DBIx::Class::Optional::Dependencies> for more details. 106 107=head1 ATTRIBUTES 108 109This class defines the following attributes. 110 111=head2 schema 112 113The underlying L<DBIx::Class::Schema> object this storage is attaching 114 115=cut 116 117has 'schema' => ( 118 is=>'rw', 119 isa=>DBICSchema, 120 weak_ref=>1, 121 required=>1, 122); 123 124=head2 pool_type 125 126Contains the classname which will instantiate the L</pool> object. Defaults 127to: L<DBIx::Class::Storage::DBI::Replicated::Pool>. 128 129=cut 130 131has 'pool_type' => ( 132 is=>'rw', 133 isa=>ClassName, 134 default=>'DBIx::Class::Storage::DBI::Replicated::Pool', 135 handles=>{ 136 'create_pool' => 'new', 137 }, 138); 139 140=head2 pool_args 141 142Contains a hashref of initialized information to pass to the Balancer object. 143See L<DBIx::Class::Storage::DBI::Replicated::Pool> for available arguments. 144 145=cut 146 147has 'pool_args' => ( 148 is=>'rw', 149 isa=>HashRef, 150 lazy=>1, 151 default=>sub { {} }, 152); 153 154 155=head2 balancer_type 156 157The replication pool requires a balance class to provider the methods for 158choose how to spread the query load across each replicant in the pool. 159 160=cut 161 162has 'balancer_type' => ( 163 is=>'rw', 164 isa=>BalancerClassNamePart, 165 coerce=>1, 166 required=>1, 167 default=> 'DBIx::Class::Storage::DBI::Replicated::Balancer::First', 168 handles=>{ 169 'create_balancer' => 'new', 170 }, 171); 172 173=head2 balancer_args 174 175Contains a hashref of initialized information to pass to the Balancer object. 176See L<DBIx::Class::Storage::DBI::Replicated::Balancer> for available arguments. 177 178=cut 179 180has 'balancer_args' => ( 181 is=>'rw', 182 isa=>HashRef, 183 lazy=>1, 184 required=>1, 185 default=>sub { {} }, 186); 187 188=head2 pool 189 190Is a <DBIx::Class::Storage::DBI::Replicated::Pool> or derived class. This is a 191container class for one or more replicated databases. 192 193=cut 194 195has 'pool' => ( 196 is=>'ro', 197 isa=>'DBIx::Class::Storage::DBI::Replicated::Pool', 198 lazy_build=>1, 199 handles=>[qw/ 200 connect_replicants 201 replicants 202 has_replicants 203 /], 204); 205 206=head2 balancer 207 208Is a <DBIx::Class::Storage::DBI::Replicated::Balancer> or derived class. This 209is a class that takes a pool (<DBIx::Class::Storage::DBI::Replicated::Pool>) 210 211=cut 212 213has 'balancer' => ( 214 is=>'rw', 215 isa=>'DBIx::Class::Storage::DBI::Replicated::Balancer', 216 lazy_build=>1, 217 handles=>[qw/auto_validate_every/], 218); 219 220=head2 master 221 222The master defines the canonical state for a pool of connected databases. All 223the replicants are expected to match this databases state. Thus, in a classic 224Master / Slaves distributed system, all the slaves are expected to replicate 225the Master's state as quick as possible. This is the only database in the 226pool of databases that is allowed to handle write traffic. 227 228=cut 229 230has 'master' => ( 231 is=> 'ro', 232 isa=>DBICStorageDBI, 233 lazy_build=>1, 234); 235 236=head1 ATTRIBUTES IMPLEMENTING THE DBIx::Storage::DBI INTERFACE 237 238The following methods are delegated all the methods required for the 239L<DBIx::Class::Storage::DBI> interface. 240 241=head2 read_handler 242 243Defines an object that implements the read side of L<BIx::Class::Storage::DBI>. 244 245=cut 246 247has 'read_handler' => ( 248 is=>'rw', 249 isa=>Object, 250 lazy_build=>1, 251 handles=>[qw/ 252 select 253 select_single 254 columns_info_for 255 _dbh_columns_info_for 256 _select 257 /], 258); 259 260=head2 write_handler 261 262Defines an object that implements the write side of L<BIx::Class::Storage::DBI>, 263as well as methods that don't write or read that can be called on only one 264storage, methods that return a C<$dbh>, and any methods that don't make sense to 265run on a replicant. 266 267=cut 268 269has 'write_handler' => ( 270 is=>'ro', 271 isa=>Object, 272 lazy_build=>1, 273 handles=>[qw/ 274 on_connect_do 275 on_disconnect_do 276 on_connect_call 277 on_disconnect_call 278 connect_info 279 _connect_info 280 throw_exception 281 sql_maker 282 sqlt_type 283 create_ddl_dir 284 deployment_statements 285 datetime_parser 286 datetime_parser_type 287 build_datetime_parser 288 last_insert_id 289 insert 290 insert_bulk 291 update 292 delete 293 dbh 294 txn_begin 295 txn_do 296 txn_commit 297 txn_rollback 298 txn_scope_guard 299 sth 300 deploy 301 with_deferred_fk_checks 302 dbh_do 303 reload_row 304 with_deferred_fk_checks 305 _prep_for_execute 306 307 backup 308 is_datatype_numeric 309 _count_select 310 _subq_count_select 311 _subq_update_delete 312 svp_rollback 313 svp_begin 314 svp_release 315 relname_to_table_alias 316 _straight_join_to_node 317 _dbh_last_insert_id 318 _fix_bind_params 319 _default_dbi_connect_attributes 320 _dbi_connect_info 321 auto_savepoint 322 _sqlt_version_ok 323 _query_end 324 bind_attribute_by_data_type 325 transaction_depth 326 _dbh 327 _select_args 328 _dbh_execute_array 329 _sql_maker_args 330 _sql_maker 331 _query_start 332 _sqlt_version_error 333 _per_row_update_delete 334 _dbh_begin_work 335 _dbh_execute_inserts_with_no_binds 336 _select_args_to_query 337 _svp_generate_name 338 _multipk_update_delete 339 source_bind_attributes 340 _normalize_connect_info 341 _parse_connect_do 342 _dbh_commit 343 _execute_array 344 _placeholders_supported 345 _verify_pid 346 savepoints 347 _sqlt_minimum_version 348 _sql_maker_opts 349 _conn_pid 350 _typeless_placeholders_supported 351 _conn_tid 352 _dbh_autocommit 353 _native_data_type 354 _get_dbh 355 sql_maker_class 356 _dbh_rollback 357 _adjust_select_args_for_complex_prefetch 358 _resolve_ident_sources 359 _resolve_column_info 360 _prune_unused_joins 361 _strip_cond_qualifiers 362 _parse_order_by 363 _resolve_aliastypes_from_select_args 364 _execute 365 _do_query 366 _dbh_sth 367 _dbh_execute 368 /], 369); 370 371has _master_connect_info_opts => 372 (is => 'rw', isa => HashRef, default => sub { {} }); 373 374=head2 around: connect_info 375 376Preserves master's C<connect_info> options (for merging with replicants.) 377Also sets any Replicated-related options from connect_info, such as 378C<pool_type>, C<pool_args>, C<balancer_type> and C<balancer_args>. 379 380=cut 381 382around connect_info => sub { 383 my ($next, $self, $info, @extra) = @_; 384 385 my $wantarray = wantarray; 386 387 my $merge = Hash::Merge->new('LEFT_PRECEDENT'); 388 389 my %opts; 390 for my $arg (@$info) { 391 next unless (reftype($arg)||'') eq 'HASH'; 392 %opts = %{ $merge->merge($arg, \%opts) }; 393 } 394 delete $opts{dsn}; 395 396 if (@opts{qw/pool_type pool_args/}) { 397 $self->pool_type(delete $opts{pool_type}) 398 if $opts{pool_type}; 399 400 $self->pool_args( 401 $merge->merge((delete $opts{pool_args} || {}), $self->pool_args) 402 ); 403 404 $self->pool($self->_build_pool) 405 if $self->pool; 406 } 407 408 if (@opts{qw/balancer_type balancer_args/}) { 409 $self->balancer_type(delete $opts{balancer_type}) 410 if $opts{balancer_type}; 411 412 $self->balancer_args( 413 $merge->merge((delete $opts{balancer_args} || {}), $self->balancer_args) 414 ); 415 416 $self->balancer($self->_build_balancer) 417 if $self->balancer; 418 } 419 420 $self->_master_connect_info_opts(\%opts); 421 422 my (@res, $res); 423 if ($wantarray) { 424 @res = $self->$next($info, @extra); 425 } else { 426 $res = $self->$next($info, @extra); 427 } 428 429 # Make sure master is blessed into the correct class and apply role to it. 430 my $master = $self->master; 431 $master->_determine_driver; 432 Moose::Meta::Class->initialize(ref $master); 433 434 DBIx::Class::Storage::DBI::Replicated::WithDSN->meta->apply($master); 435 436 # link pool back to master 437 $self->pool->master($master); 438 439 $wantarray ? @res : $res; 440}; 441 442=head1 METHODS 443 444This class defines the following methods. 445 446=head2 BUILDARGS 447 448L<DBIx::Class::Schema> when instantiating its storage passed itself as the 449first argument. So we need to massage the arguments a bit so that all the 450bits get put into the correct places. 451 452=cut 453 454sub BUILDARGS { 455 my ($class, $schema, $storage_type_args, @args) = @_; 456 457 return { 458 schema=>$schema, 459 %$storage_type_args, 460 @args 461 } 462} 463 464=head2 _build_master 465 466Lazy builder for the L</master> attribute. 467 468=cut 469 470sub _build_master { 471 my $self = shift @_; 472 my $master = DBIx::Class::Storage::DBI->new($self->schema); 473 $master 474} 475 476=head2 _build_pool 477 478Lazy builder for the L</pool> attribute. 479 480=cut 481 482sub _build_pool { 483 my $self = shift @_; 484 $self->create_pool(%{$self->pool_args}); 485} 486 487=head2 _build_balancer 488 489Lazy builder for the L</balancer> attribute. This takes a Pool object so that 490the balancer knows which pool it's balancing. 491 492=cut 493 494sub _build_balancer { 495 my $self = shift @_; 496 $self->create_balancer( 497 pool=>$self->pool, 498 master=>$self->master, 499 %{$self->balancer_args}, 500 ); 501} 502 503=head2 _build_write_handler 504 505Lazy builder for the L</write_handler> attribute. The default is to set this to 506the L</master>. 507 508=cut 509 510sub _build_write_handler { 511 return shift->master; 512} 513 514=head2 _build_read_handler 515 516Lazy builder for the L</read_handler> attribute. The default is to set this to 517the L</balancer>. 518 519=cut 520 521sub _build_read_handler { 522 return shift->balancer; 523} 524 525=head2 around: connect_replicants 526 527All calls to connect_replicants needs to have an existing $schema tacked onto 528top of the args, since L<DBIx::Storage::DBI> needs it, and any C<connect_info> 529options merged with the master, with replicant opts having higher priority. 530 531=cut 532 533around connect_replicants => sub { 534 my ($next, $self, @args) = @_; 535 536 for my $r (@args) { 537 $r = [ $r ] unless reftype $r eq 'ARRAY'; 538 539 $self->throw_exception('coderef replicant connect_info not supported') 540 if ref $r->[0] && reftype $r->[0] eq 'CODE'; 541 542# any connect_info options? 543 my $i = 0; 544 $i++ while $i < @$r && (reftype($r->[$i])||'') ne 'HASH'; 545 546# make one if none 547 $r->[$i] = {} unless $r->[$i]; 548 549# merge if two hashes 550 my @hashes = @$r[$i .. $#{$r}]; 551 552 $self->throw_exception('invalid connect_info options') 553 if (grep { reftype($_) eq 'HASH' } @hashes) != @hashes; 554 555 $self->throw_exception('too many hashrefs in connect_info') 556 if @hashes > 2; 557 558 my $merge = Hash::Merge->new('LEFT_PRECEDENT'); 559 my %opts = %{ $merge->merge(reverse @hashes) }; 560 561# delete them 562 splice @$r, $i+1, ($#{$r} - $i), (); 563 564# make sure master/replicants opts don't clash 565 my %master_opts = %{ $self->_master_connect_info_opts }; 566 if (exists $opts{dbh_maker}) { 567 delete @master_opts{qw/dsn user password/}; 568 } 569 delete $master_opts{dbh_maker}; 570 571# merge with master 572 %opts = %{ $merge->merge(\%opts, \%master_opts) }; 573 574# update 575 $r->[$i] = \%opts; 576 } 577 578 $self->$next($self->schema, @args); 579}; 580 581=head2 all_storages 582 583Returns an array of of all the connected storage backends. The first element 584in the returned array is the master, and the remainings are each of the 585replicants. 586 587=cut 588 589sub all_storages { 590 my $self = shift @_; 591 return grep {defined $_ && blessed $_} ( 592 $self->master, 593 values %{ $self->replicants }, 594 ); 595} 596 597=head2 execute_reliably ($coderef, ?@args) 598 599Given a coderef, saves the current state of the L</read_handler>, forces it to 600use reliable storage (e.g. sets it to the master), executes a coderef and then 601restores the original state. 602 603Example: 604 605 my $reliably = sub { 606 my $name = shift @_; 607 $schema->resultset('User')->create({name=>$name}); 608 my $user_rs = $schema->resultset('User')->find({name=>$name}); 609 return $user_rs; 610 }; 611 612 my $user_rs = $schema->storage->execute_reliably($reliably, 'John'); 613 614Use this when you must be certain of your database state, such as when you just 615inserted something and need to get a resultset including it, etc. 616 617=cut 618 619sub execute_reliably { 620 my ($self, $coderef, @args) = @_; 621 622 unless( ref $coderef eq 'CODE') { 623 $self->throw_exception('Second argument must be a coderef'); 624 } 625 626 ##Get copy of master storage 627 my $master = $self->master; 628 629 ##Get whatever the current read hander is 630 my $current = $self->read_handler; 631 632 ##Set the read handler to master 633 $self->read_handler($master); 634 635 ## do whatever the caller needs 636 my @result; 637 my $want_array = wantarray; 638 639 eval { 640 if($want_array) { 641 @result = $coderef->(@args); 642 } elsif(defined $want_array) { 643 ($result[0]) = ($coderef->(@args)); 644 } else { 645 $coderef->(@args); 646 } 647 }; 648 649 ##Reset to the original state 650 $self->read_handler($current); 651 652 ##Exception testing has to come last, otherwise you might leave the 653 ##read_handler set to master. 654 655 if($@) { 656 $self->throw_exception("coderef returned an error: $@"); 657 } else { 658 return $want_array ? @result : $result[0]; 659 } 660} 661 662=head2 set_reliable_storage 663 664Sets the current $schema to be 'reliable', that is all queries, both read and 665write are sent to the master 666 667=cut 668 669sub set_reliable_storage { 670 my $self = shift @_; 671 my $schema = $self->schema; 672 my $write_handler = $self->schema->storage->write_handler; 673 674 $schema->storage->read_handler($write_handler); 675} 676 677=head2 set_balanced_storage 678 679Sets the current $schema to be use the </balancer> for all reads, while all 680writes are sent to the master only 681 682=cut 683 684sub set_balanced_storage { 685 my $self = shift @_; 686 my $schema = $self->schema; 687 my $balanced_handler = $self->schema->storage->balancer; 688 689 $schema->storage->read_handler($balanced_handler); 690} 691 692=head2 connected 693 694Check that the master and at least one of the replicants is connected. 695 696=cut 697 698sub connected { 699 my $self = shift @_; 700 return 701 $self->master->connected && 702 $self->pool->connected_replicants; 703} 704 705=head2 ensure_connected 706 707Make sure all the storages are connected. 708 709=cut 710 711sub ensure_connected { 712 my $self = shift @_; 713 foreach my $source ($self->all_storages) { 714 $source->ensure_connected(@_); 715 } 716} 717 718=head2 limit_dialect 719 720Set the limit_dialect for all existing storages 721 722=cut 723 724sub limit_dialect { 725 my $self = shift @_; 726 foreach my $source ($self->all_storages) { 727 $source->limit_dialect(@_); 728 } 729 return $self->master->quote_char; 730} 731 732=head2 quote_char 733 734Set the quote_char for all existing storages 735 736=cut 737 738sub quote_char { 739 my $self = shift @_; 740 foreach my $source ($self->all_storages) { 741 $source->quote_char(@_); 742 } 743 return $self->master->quote_char; 744} 745 746=head2 name_sep 747 748Set the name_sep for all existing storages 749 750=cut 751 752sub name_sep { 753 my $self = shift @_; 754 foreach my $source ($self->all_storages) { 755 $source->name_sep(@_); 756 } 757 return $self->master->name_sep; 758} 759 760=head2 set_schema 761 762Set the schema object for all existing storages 763 764=cut 765 766sub set_schema { 767 my $self = shift @_; 768 foreach my $source ($self->all_storages) { 769 $source->set_schema(@_); 770 } 771} 772 773=head2 debug 774 775set a debug flag across all storages 776 777=cut 778 779sub debug { 780 my $self = shift @_; 781 if(@_) { 782 foreach my $source ($self->all_storages) { 783 $source->debug(@_); 784 } 785 } 786 return $self->master->debug; 787} 788 789=head2 debugobj 790 791set a debug object 792 793=cut 794 795sub debugobj { 796 my $self = shift @_; 797 return $self->master->debugobj(@_); 798} 799 800=head2 debugfh 801 802set a debugfh object 803 804=cut 805 806sub debugfh { 807 my $self = shift @_; 808 return $self->master->debugfh(@_); 809} 810 811=head2 debugcb 812 813set a debug callback 814 815=cut 816 817sub debugcb { 818 my $self = shift @_; 819 return $self->master->debugcb(@_); 820} 821 822=head2 disconnect 823 824disconnect everything 825 826=cut 827 828sub disconnect { 829 my $self = shift @_; 830 foreach my $source ($self->all_storages) { 831 $source->disconnect(@_); 832 } 833} 834 835=head2 cursor_class 836 837set cursor class on all storages, or return master's 838 839=cut 840 841sub cursor_class { 842 my ($self, $cursor_class) = @_; 843 844 if ($cursor_class) { 845 $_->cursor_class($cursor_class) for $self->all_storages; 846 } 847 $self->master->cursor_class; 848} 849 850=head2 cursor 851 852set cursor class on all storages, or return master's, alias for L</cursor_class> 853above. 854 855=cut 856 857sub cursor { 858 my ($self, $cursor_class) = @_; 859 860 if ($cursor_class) { 861 $_->cursor($cursor_class) for $self->all_storages; 862 } 863 $self->master->cursor; 864} 865 866=head2 unsafe 867 868sets the L<DBIx::Class::Storage::DBI/unsafe> option on all storages or returns 869master's current setting 870 871=cut 872 873sub unsafe { 874 my $self = shift; 875 876 if (@_) { 877 $_->unsafe(@_) for $self->all_storages; 878 } 879 880 return $self->master->unsafe; 881} 882 883=head2 disable_sth_caching 884 885sets the L<DBIx::Class::Storage::DBI/disable_sth_caching> option on all storages 886or returns master's current setting 887 888=cut 889 890sub disable_sth_caching { 891 my $self = shift; 892 893 if (@_) { 894 $_->disable_sth_caching(@_) for $self->all_storages; 895 } 896 897 return $self->master->disable_sth_caching; 898} 899 900=head2 lag_behind_master 901 902returns the highest Replicant L<DBIx::Class::Storage::DBI/lag_behind_master> 903setting 904 905=cut 906 907sub lag_behind_master { 908 my $self = shift; 909 910 return max map $_->lag_behind_master, $self->replicants; 911} 912 913=head2 is_replicating 914 915returns true if all replicants return true for 916L<DBIx::Class::Storage::DBI/is_replicating> 917 918=cut 919 920sub is_replicating { 921 my $self = shift; 922 923 return (grep $_->is_replicating, $self->replicants) == ($self->replicants); 924} 925 926=head2 connect_call_datetime_setup 927 928calls L<DBIx::Class::Storage::DBI/connect_call_datetime_setup> for all storages 929 930=cut 931 932sub connect_call_datetime_setup { 933 my $self = shift; 934 $_->connect_call_datetime_setup for $self->all_storages; 935} 936 937sub _populate_dbh { 938 my $self = shift; 939 $_->_populate_dbh for $self->all_storages; 940} 941 942sub _connect { 943 my $self = shift; 944 $_->_connect for $self->all_storages; 945} 946 947sub _rebless { 948 my $self = shift; 949 $_->_rebless for $self->all_storages; 950} 951 952sub _determine_driver { 953 my $self = shift; 954 $_->_determine_driver for $self->all_storages; 955} 956 957sub _driver_determined { 958 my $self = shift; 959 960 if (@_) { 961 $_->_driver_determined(@_) for $self->all_storages; 962 } 963 964 return $self->master->_driver_determined; 965} 966 967sub _init { 968 my $self = shift; 969 970 $_->_init for $self->all_storages; 971} 972 973sub _run_connection_actions { 974 my $self = shift; 975 976 $_->_run_connection_actions for $self->all_storages; 977} 978 979sub _do_connection_actions { 980 my $self = shift; 981 982 if (@_) { 983 $_->_do_connection_actions(@_) for $self->all_storages; 984 } 985} 986 987sub connect_call_do_sql { 988 my $self = shift; 989 $_->connect_call_do_sql(@_) for $self->all_storages; 990} 991 992sub disconnect_call_do_sql { 993 my $self = shift; 994 $_->disconnect_call_do_sql(@_) for $self->all_storages; 995} 996 997sub _seems_connected { 998 my $self = shift; 999 1000 return min map $_->_seems_connected, $self->all_storages; 1001} 1002 1003sub _ping { 1004 my $self = shift; 1005 1006 return min map $_->_ping, $self->all_storages; 1007} 1008 1009=head1 GOTCHAS 1010 1011Due to the fact that replicants can lag behind a master, you must take care to 1012make sure you use one of the methods to force read queries to a master should 1013you need realtime data integrity. For example, if you insert a row, and then 1014immediately re-read it from the database (say, by doing $row->discard_changes) 1015or you insert a row and then immediately build a query that expects that row 1016to be an item, you should force the master to handle reads. Otherwise, due to 1017the lag, there is no certainty your data will be in the expected state. 1018 1019For data integrity, all transactions automatically use the master storage for 1020all read and write queries. Using a transaction is the preferred and recommended 1021method to force the master to handle all read queries. 1022 1023Otherwise, you can force a single query to use the master with the 'force_pool' 1024attribute: 1025 1026 my $row = $resultset->search(undef, {force_pool=>'master'})->find($pk); 1027 1028This attribute will safely be ignore by non replicated storages, so you can use 1029the same code for both types of systems. 1030 1031Lastly, you can use the L</execute_reliably> method, which works very much like 1032a transaction. 1033 1034For debugging, you can turn replication on/off with the methods L</set_reliable_storage> 1035and L</set_balanced_storage>, however this operates at a global level and is not 1036suitable if you have a shared Schema object being used by multiple processes, 1037such as on a web application server. You can get around this limitation by 1038using the Schema clone method. 1039 1040 my $new_schema = $schema->clone; 1041 $new_schema->set_reliable_storage; 1042 1043 ## $new_schema will use only the Master storage for all reads/writes while 1044 ## the $schema object will use replicated storage. 1045 1046=head1 AUTHOR 1047 1048 John Napiorkowski <john.napiorkowski@takkle.com> 1049 1050Based on code originated by: 1051 1052 Norbert Csongr�di <bert@cpan.org> 1053 Peter Sikl�si <einon@einon.hu> 1054 1055=head1 LICENSE 1056 1057You may distribute this code under the same terms as Perl itself. 1058 1059=cut 1060 1061__PACKAGE__->meta->make_immutable; 1062 10631; 1064