PHP并发查询MySQL的实例代码

2019-05-01 04:55:34于丽

对于这种情况可以使用多路复用轮询多个IO。

PHP实现并发查询MySQL

PHP的mysqli(mysqlnd驱动)提供多路复用轮询IO(mysqli_poll)和异步查询(MYSQLI_ASYNC、mysqli_reap_async_query),使用这两个特性实现并发查询,示例代码:

<?php
 $sqls = array(
  'SELECT * FROM `mz_table_1` LIMIT 1000,10',
  'SELECT * FROM `mz_table_1` LIMIT 1010,10',
  'SELECT * FROM `mz_table_1` LIMIT 1020,10',
  'SELECT * FROM `mz_table_1` LIMIT 10000,10',
  'SELECT * FROM `mz_table_2` LIMIT 1',
  'SELECT * FROM `mz_table_2` LIMIT 5,1'
 );
 $links = [];
 $tvs = microtime();
 $tv = explode(' ', $tvs);
 $start = $tv[1] * 1000 + (int)($tv[0] * 1000);
 // 链接数据库,并发起异步查询
 foreach ($sqls as $sql) { 
  $link = mysqli_connect('127.0.0.1', 'root', 'root', 'dbname', '3306');
  $link->query($sql, MYSQLI_ASYNC); // 发起异步查询,立即返回
  $links[$link->thread_id] = $link;
 }
 $llen = count($links);
 $process = 0;
 do {
  $r_array = $e_array = $reject = $links;
  // 多路复用轮询IO
  if(!($ret = mysqli_poll($r_array, $e_array, $reject, 2))) {
   continue;
  }
  // 读取有结果返回的查询,处理结果
  foreach ($r_array as $link) {
   if ($result = $link->reap_async_query()) {
    print_r($result->fetch_row());
    if (is_object($result))
     mysqli_free_result($result);
   } else {
   }
   // 操作完后,把当前数据链接从待轮询集合中删除
   unset($links[$link->thread_id]);
   $link->close();
   $process++;
  }
  foreach ($e_array as $link) {
   die;
  }
  foreach ($reject as $link) {
   die;
  }
 }while($process < $llen);
 $tvs = microtime();
 $tv = explode(' ', $tvs);
 $end = $tv[1] * 1000 + (int)($tv[0] * 1000);
 echo $end - $start,PHP_EOL;

mysqli_poll源码:

#ifndef PHP_WIN32
#define php_select(m, r, w, e, t) select(m, r, w, e, t)
#else
#include "win32/select.h"
#endif
/* {{{ mysqlnd_poll */
PHPAPI enum_func_status
mysqlnd_poll(MYSQLND **r_array, MYSQLND **e_array, MYSQLND ***dont_poll, long sec, long usec, int * desc_num)
{
 struct timeval tv;
 struct timeval *tv_p = NULL;
 fd_set   rfds, wfds, efds;
 php_socket_t max_fd = 0;
 int    retval, sets = 0;
 int    set_count, max_set_count = 0;
 DBG_ENTER("_mysqlnd_poll");
 if (sec < 0 || usec < 0) {
  php_error_docref(NULL, E_WARNING, "Negative values passed for sec and/or usec");
  DBG_RETURN(FAIL);
 }
 FD_ZERO(&rfds);
 FD_ZERO(&wfds);
 FD_ZERO(&efds);
 // 从所有mysqli链接中获取socket链接描述符
 if (r_array != NULL) {
  *dont_poll = mysqlnd_stream_array_check_for_readiness(r_array);
  set_count = mysqlnd_stream_array_to_fd_set(r_array, &rfds, &max_fd);
  if (set_count > max_set_count) {
   max_set_count = set_count;
  }
  sets += set_count;
 }
 // 从所有mysqli链接中获取socket链接描述符
 if (e_array != NULL) {
  set_count = mysqlnd_stream_array_to_fd_set(e_array, &efds, &max_fd);
  if (set_count > max_set_count) {
   max_set_count = set_count;
  }
  sets += set_count;
 }
 if (!sets) {
  php_error_docref(NULL, E_WARNING, *dont_poll ? "All arrays passed are clear":"No stream arrays were passed");
  DBG_ERR_FMT(*dont_poll ? "All arrays passed are clear":"No stream arrays were passed");
  DBG_RETURN(FAIL);
 }
 PHP_SAFE_MAX_FD(max_fd, max_set_count);
 // select轮询阻塞时间
 if (usec > 999999) {
  tv.tv_sec = sec + (usec / 1000000);
  tv.tv_usec = usec % 1000000;
 } else {
  tv.tv_sec = sec;
  tv.tv_usec = usec;
 }
 tv_p = &tv;
 // 轮询,等待多个IO可读,php_select是select的宏定义
 retval = php_select(max_fd + 1, &rfds, &wfds, &efds, tv_p);
 if (retval == -1) {
  php_error_docref(NULL, E_WARNING, "unable to select [%d]: %s (max_fd=%d)",
      errno, strerror(errno), max_fd);
  DBG_RETURN(FAIL);
 }
 if (r_array != NULL) {
  mysqlnd_stream_array_from_fd_set(r_array, &rfds);
 }
 if (e_array != NULL) {
  mysqlnd_stream_array_from_fd_set(e_array, &efds);
 }
 // 返回可操作的IO数量
 *desc_num = retval;
 DBG_RETURN(PASS);
}
								 
			 
相关文章 大家在看