Commit 4a71d054 authored by David S. Miller's avatar David S. Miller
Browse files

Merge branch 'rhashtable-next'



Ying Xue says:

====================
Involve rhashtable_lookup_insert routine

The series aims to involve rhashtable_lookup_insert() to guarantee
that the process of lookup and insertion of an object from/into hash
table is finished atomically, allowing rhashtable's users not to
introduce an extra lock during search and insertion. For example,
tipc socket is the first user benefiting from this enhancement.

v2 changes:
 - fix the issue of waking up worker thread under a wrong condition in
   patch #2, which is pointed by Thomas.
 - move a comment from rhashtable_inser() to rhashtable_wakeup_worker()
   according to Thomas's suggestion in patch #2.
 - indent the third line of condition statement in
   rhashtable_wakeup_worker() to inner bracket in patch #2.
 - drop patch #3 of v1 series
 - fix an issue of being unable to remove an object from hash table in
   certain special case in patch #4.
 - involve a new patch #5 to avoid unnecessary wakeup for worker queue
   thread
 - involve a new patch #6 to initialize atomic "nelems" variable
 - adjust "nelem_hint" value from 256 to 192 avoiding to unnecessarily
   to shrink hash table from the beginning phase in patch #7.

v1 changes:
 But before rhashtable_lookup_insert() is involved, the following
 optimizations need to be first done:
- simplify rhashtable_lookup by reusing rhashtable_lookup_compare()
- introduce rhashtable_wakeup_worker() to further reduce duplicated
  code in patch #2
- fix an issue in patch #3
- involve rhashtable_lookup_insert(). But in this version, we firstly
  use rhashtable_lookup() to search duplicate key in both old and new
  bucket table; secondly introduce another __rhashtable_insert() helper
  function to reduce the duplicated code between rhashtable_insert()
  and rhashtable_lookup_insert().
- add patch #5 into the series as it depends on above patches. But in
  this version, no change is made comparing with its previous version.
====================

Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 7c1b7023 07f6c4bc
......@@ -113,7 +113,7 @@ struct rhashtable {
struct bucket_table __rcu *tbl;
struct bucket_table __rcu *future_tbl;
atomic_t nelems;
size_t shift;
atomic_t shift;
struct rhashtable_params p;
struct delayed_work run_work;
struct mutex mutex;
......@@ -168,6 +168,7 @@ int rhashtable_shrink(struct rhashtable *ht);
void *rhashtable_lookup(struct rhashtable *ht, const void *key);
void *rhashtable_lookup_compare(struct rhashtable *ht, const void *key,
bool (*compare)(void *, void *), void *arg);
bool rhashtable_lookup_insert(struct rhashtable *ht, struct rhash_head *obj);
void rhashtable_destroy(struct rhashtable *ht);
......
......@@ -199,7 +199,8 @@ static struct bucket_table *bucket_table_alloc(struct rhashtable *ht,
bool rht_grow_above_75(const struct rhashtable *ht, size_t new_size)
{
/* Expand table when exceeding 75% load */
return atomic_read(&ht->nelems) > (new_size / 4 * 3);
return atomic_read(&ht->nelems) > (new_size / 4 * 3) &&
(ht->p.max_shift && atomic_read(&ht->shift) < ht->p.max_shift);
}
EXPORT_SYMBOL_GPL(rht_grow_above_75);
......@@ -211,7 +212,8 @@ EXPORT_SYMBOL_GPL(rht_grow_above_75);
bool rht_shrink_below_30(const struct rhashtable *ht, size_t new_size)
{
/* Shrink table beneath 30% load */
return atomic_read(&ht->nelems) < (new_size * 3 / 10);
return atomic_read(&ht->nelems) < (new_size * 3 / 10) &&
(atomic_read(&ht->shift) > ht->p.min_shift);
}
EXPORT_SYMBOL_GPL(rht_shrink_below_30);
......@@ -318,14 +320,11 @@ int rhashtable_expand(struct rhashtable *ht)
ASSERT_RHT_MUTEX(ht);
if (ht->p.max_shift && ht->shift >= ht->p.max_shift)
return 0;
new_tbl = bucket_table_alloc(ht, old_tbl->size * 2);
if (new_tbl == NULL)
return -ENOMEM;
ht->shift++;
atomic_inc(&ht->shift);
/* Make insertions go into the new, empty table right away. Deletions
* and lookups will be attempted in both tables until we synchronize.
......@@ -421,9 +420,6 @@ int rhashtable_shrink(struct rhashtable *ht)
ASSERT_RHT_MUTEX(ht);
if (ht->shift <= ht->p.min_shift)
return 0;
new_tbl = bucket_table_alloc(ht, tbl->size / 2);
if (new_tbl == NULL)
return -ENOMEM;
......@@ -462,7 +458,7 @@ int rhashtable_shrink(struct rhashtable *ht)
/* Publish the new, valid hash table */
rcu_assign_pointer(ht->tbl, new_tbl);
ht->shift--;
atomic_dec(&ht->shift);
/* Wait for readers. No new readers will have references to the
* old hash table.
......@@ -492,8 +488,39 @@ static void rht_deferred_worker(struct work_struct *work)
mutex_unlock(&ht->mutex);
}
static void rhashtable_wakeup_worker(struct rhashtable *ht)
{
struct bucket_table *tbl = rht_dereference_rcu(ht->tbl, ht);
struct bucket_table *new_tbl = rht_dereference_rcu(ht->future_tbl, ht);
size_t size = tbl->size;
/* Only adjust the table if no resizing is currently in progress. */
if (tbl == new_tbl &&
((ht->p.grow_decision && ht->p.grow_decision(ht, size)) ||
(ht->p.shrink_decision && ht->p.shrink_decision(ht, size))))
schedule_delayed_work(&ht->run_work, 0);
}
static void __rhashtable_insert(struct rhashtable *ht, struct rhash_head *obj,
struct bucket_table *tbl, u32 hash)
{
struct rhash_head *head = rht_dereference_bucket(tbl->buckets[hash],
tbl, hash);
if (rht_is_a_nulls(head))
INIT_RHT_NULLS_HEAD(obj->next, ht, hash);
else
RCU_INIT_POINTER(obj->next, head);
rcu_assign_pointer(tbl->buckets[hash], obj);
atomic_inc(&ht->nelems);
rhashtable_wakeup_worker(ht);
}
/**
* rhashtable_insert - insert object into hash hash table
* rhashtable_insert - insert object into hash table
* @ht: hash table
* @obj: pointer to hash head inside object
*
......@@ -510,7 +537,6 @@ static void rht_deferred_worker(struct work_struct *work)
void rhashtable_insert(struct rhashtable *ht, struct rhash_head *obj)
{
struct bucket_table *tbl;
struct rhash_head *head;
spinlock_t *lock;
unsigned hash;
......@@ -521,22 +547,9 @@ void rhashtable_insert(struct rhashtable *ht, struct rhash_head *obj)
lock = bucket_lock(tbl, hash);
spin_lock_bh(lock);
head = rht_dereference_bucket(tbl->buckets[hash], tbl, hash);
if (rht_is_a_nulls(head))
INIT_RHT_NULLS_HEAD(obj->next, ht, hash);
else
RCU_INIT_POINTER(obj->next, head);
rcu_assign_pointer(tbl->buckets[hash], obj);
__rhashtable_insert(ht, obj, tbl, hash);
spin_unlock_bh(lock);
atomic_inc(&ht->nelems);
/* Only grow the table if no resizing is currently in progress. */
if (ht->tbl != ht->future_tbl &&
ht->p.grow_decision && ht->p.grow_decision(ht, tbl->size))
schedule_delayed_work(&ht->run_work, 0);
rcu_read_unlock();
}
EXPORT_SYMBOL_GPL(rhashtable_insert);
......@@ -550,7 +563,7 @@ EXPORT_SYMBOL_GPL(rhashtable_insert);
* walk the bucket chain upon removal. The removal operation is thus
* considerable slow if the hash table is not correctly sized.
*
* Will automatically shrink the table via rhashtable_expand() if the the
* Will automatically shrink the table via rhashtable_expand() if the
* shrink_decision function specified at rhashtable_init() returns true.
*
* The caller must ensure that no concurrent table mutations occur. It is
......@@ -584,20 +597,17 @@ restart:
spin_unlock_bh(lock);
if (ht->tbl != ht->future_tbl &&
ht->p.shrink_decision &&
ht->p.shrink_decision(ht, tbl->size))
schedule_delayed_work(&ht->run_work, 0);
rhashtable_wakeup_worker(ht);
rcu_read_unlock();
return true;
}
if (tbl != rht_dereference_rcu(ht->tbl, ht)) {
if (tbl != rht_dereference_rcu(ht->future_tbl, ht)) {
spin_unlock_bh(lock);
tbl = rht_dereference_rcu(ht->tbl, ht);
tbl = rht_dereference_rcu(ht->future_tbl, ht);
hash = head_hashfn(ht, tbl, obj);
lock = bucket_lock(tbl, hash);
......@@ -612,6 +622,19 @@ restart:
}
EXPORT_SYMBOL_GPL(rhashtable_remove);
struct rhashtable_compare_arg {
struct rhashtable *ht;
const void *key;
};
static bool rhashtable_compare(void *ptr, void *arg)
{
struct rhashtable_compare_arg *x = arg;
struct rhashtable *ht = x->ht;
return !memcmp(ptr + ht->p.key_offset, x->key, ht->p.key_len);
}
/**
* rhashtable_lookup - lookup key in hash table
* @ht: hash table
......@@ -621,38 +644,20 @@ EXPORT_SYMBOL_GPL(rhashtable_remove);
* for a entry with an identical key. The first matching entry is returned.
*
* This lookup function may only be used for fixed key hash table (key_len
* paramter set). It will BUG() if used inappropriately.
* parameter set). It will BUG() if used inappropriately.
*
* Lookups may occur in parallel with hashtable mutations and resizing.
*/
void *rhashtable_lookup(struct rhashtable *ht, const void *key)
{
const struct bucket_table *tbl, *old_tbl;
struct rhash_head *he;
u32 hash;
struct rhashtable_compare_arg arg = {
.ht = ht,
.key = key,
};
BUG_ON(!ht->p.key_len);
rcu_read_lock();
old_tbl = rht_dereference_rcu(ht->tbl, ht);
tbl = rht_dereference_rcu(ht->future_tbl, ht);
hash = key_hashfn(ht, key, ht->p.key_len);
restart:
rht_for_each_rcu(he, tbl, rht_bucket_index(tbl, hash)) {
if (memcmp(rht_obj(ht, he) + ht->p.key_offset, key,
ht->p.key_len))
continue;
rcu_read_unlock();
return rht_obj(ht, he);
}
if (unlikely(tbl != old_tbl)) {
tbl = old_tbl;
goto restart;
}
rcu_read_unlock();
return NULL;
return rhashtable_lookup_compare(ht, key, &rhashtable_compare, &arg);
}
EXPORT_SYMBOL_GPL(rhashtable_lookup);
......@@ -700,6 +705,66 @@ restart:
}
EXPORT_SYMBOL_GPL(rhashtable_lookup_compare);
/**
* rhashtable_lookup_insert - lookup and insert object into hash table
* @ht: hash table
* @obj: pointer to hash head inside object
*
* Locks down the bucket chain in both the old and new table if a resize
* is in progress to ensure that writers can't remove from the old table
* and can't insert to the new table during the atomic operation of search
* and insertion. Searches for duplicates in both the old and new table if
* a resize is in progress.
*
* This lookup function may only be used for fixed key hash table (key_len
* parameter set). It will BUG() if used inappropriately.
*
* It is safe to call this function from atomic context.
*
* Will trigger an automatic deferred table resizing if the size grows
* beyond the watermark indicated by grow_decision() which can be passed
* to rhashtable_init().
*/
bool rhashtable_lookup_insert(struct rhashtable *ht, struct rhash_head *obj)
{
struct bucket_table *new_tbl, *old_tbl;
spinlock_t *new_bucket_lock, *old_bucket_lock;
u32 new_hash, old_hash;
bool success = true;
BUG_ON(!ht->p.key_len);
rcu_read_lock();
old_tbl = rht_dereference_rcu(ht->tbl, ht);
old_hash = head_hashfn(ht, old_tbl, obj);
old_bucket_lock = bucket_lock(old_tbl, old_hash);
spin_lock_bh(old_bucket_lock);
new_tbl = rht_dereference_rcu(ht->future_tbl, ht);
new_hash = head_hashfn(ht, new_tbl, obj);
new_bucket_lock = bucket_lock(new_tbl, new_hash);
if (unlikely(old_tbl != new_tbl))
spin_lock_bh_nested(new_bucket_lock, RHT_LOCK_NESTED);
if (rhashtable_lookup(ht, rht_obj(ht, obj) + ht->p.key_offset)) {
success = false;
goto exit;
}
__rhashtable_insert(ht, obj, new_tbl, new_hash);
exit:
if (unlikely(old_tbl != new_tbl))
spin_unlock_bh(new_bucket_lock);
spin_unlock_bh(old_bucket_lock);
rcu_read_unlock();
return success;
}
EXPORT_SYMBOL_GPL(rhashtable_lookup_insert);
static size_t rounded_hashtable_size(struct rhashtable_params *params)
{
return max(roundup_pow_of_two(params->nelem_hint * 4 / 3),
......@@ -782,7 +847,8 @@ int rhashtable_init(struct rhashtable *ht, struct rhashtable_params *params)
if (tbl == NULL)
return -ENOMEM;
ht->shift = ilog2(tbl->size);
atomic_set(&ht->nelems, 0);
atomic_set(&ht->shift, ilog2(tbl->size));
RCU_INIT_POINTER(ht->tbl, tbl);
RCU_INIT_POINTER(ht->future_tbl, tbl);
......
......@@ -20,18 +20,6 @@ menuconfig TIPC
If in doubt, say N.
config TIPC_PORTS
int "Maximum number of ports in a node"
depends on TIPC
range 127 65535
default "8191"
help
Specifies how many ports can be supported by a node.
Can range from 127 to 65535 ports; default is 8191.
Setting this to a smaller value saves some memory,
setting it to higher allows for more ports.
config TIPC_MEDIA_IB
bool "InfiniBand media type support"
depends on TIPC && INFINIBAND_IPOIB
......
......@@ -183,22 +183,6 @@ static struct sk_buff *cfg_set_own_addr(void)
return tipc_cfg_reply_error_string("cannot change to network mode");
}
static struct sk_buff *cfg_set_max_ports(void)
{
u32 value;
if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_UNSIGNED))
return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
if (value == tipc_max_ports)
return tipc_cfg_reply_none();
if (value < 127 || value > 65535)
return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
" (max ports must be 127-65535)");
return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
" (cannot change max ports while TIPC is active)");
}
static struct sk_buff *cfg_set_netid(void)
{
u32 value;
......@@ -285,15 +269,9 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
case TIPC_CMD_SET_NODE_ADDR:
rep_tlv_buf = cfg_set_own_addr();
break;
case TIPC_CMD_SET_MAX_PORTS:
rep_tlv_buf = cfg_set_max_ports();
break;
case TIPC_CMD_SET_NETID:
rep_tlv_buf = cfg_set_netid();
break;
case TIPC_CMD_GET_MAX_PORTS:
rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_max_ports);
break;
case TIPC_CMD_GET_NETID:
rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_net_id);
break;
......@@ -317,6 +295,8 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
case TIPC_CMD_SET_REMOTE_MNG:
case TIPC_CMD_GET_REMOTE_MNG:
case TIPC_CMD_DUMP_LOG:
case TIPC_CMD_SET_MAX_PORTS:
case TIPC_CMD_GET_MAX_PORTS:
rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
" (obsolete command)");
break;
......
......@@ -34,6 +34,8 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
#include "core.h"
#include "name_table.h"
#include "subscr.h"
......@@ -47,7 +49,6 @@ int tipc_random __read_mostly;
/* configurable TIPC parameters */
u32 tipc_own_addr __read_mostly;
int tipc_max_ports __read_mostly;
int tipc_net_id __read_mostly;
int sysctl_tipc_rmem[3] __read_mostly; /* min/default/max */
......@@ -84,9 +85,9 @@ static void tipc_core_stop(void)
tipc_netlink_stop();
tipc_subscr_stop();
tipc_nametbl_stop();
tipc_sk_ref_table_stop();
tipc_socket_stop();
tipc_unregister_sysctl();
tipc_sk_rht_destroy();
}
/**
......@@ -98,7 +99,7 @@ static int tipc_core_start(void)
get_random_bytes(&tipc_random, sizeof(tipc_random));
err = tipc_sk_ref_table_init(tipc_max_ports, tipc_random);
err = tipc_sk_rht_init();
if (err)
goto out_reftbl;
......@@ -138,7 +139,7 @@ out_socket:
out_netlink:
tipc_nametbl_stop();
out_nametbl:
tipc_sk_ref_table_stop();
tipc_sk_rht_destroy();
out_reftbl:
return err;
}
......@@ -150,7 +151,6 @@ static int __init tipc_init(void)
pr_info("Activated (version " TIPC_MOD_VER ")\n");
tipc_own_addr = 0;
tipc_max_ports = CONFIG_TIPC_PORTS;
tipc_net_id = 4711;
sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
......
......@@ -37,8 +37,6 @@
#ifndef _TIPC_CORE_H
#define _TIPC_CORE_H
#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
#include <linux/tipc.h>
#include <linux/tipc_config.h>
#include <linux/tipc_netlink.h>
......@@ -79,7 +77,6 @@ int tipc_snprintf(char *buf, int len, const char *fmt, ...);
* Global configuration variables
*/
extern u32 tipc_own_addr __read_mostly;
extern int tipc_max_ports __read_mostly;
extern int tipc_net_id __read_mostly;
extern int sysctl_tipc_rmem[3] __read_mostly;
extern int sysctl_tipc_named_timeout __read_mostly;
......
This diff is collapsed.
......@@ -46,8 +46,8 @@ int tipc_sk_rcv(struct sk_buff *buf);
struct sk_buff *tipc_sk_socks_show(void);
void tipc_sk_mcast_rcv(struct sk_buff *buf);
void tipc_sk_reinit(void);
int tipc_sk_ref_table_init(u32 requested_size, u32 start);
void tipc_sk_ref_table_stop(void);
int tipc_sk_rht_init(void);
void tipc_sk_rht_destroy(void);
int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb);
int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment